A step-by-step guide to T’s pipeline execution model
Pipelines are T’s core execution model. They let you define named computation steps (nodes) that are automatically ordered by their dependencies, executed deterministically, and cached for re-use.
A pipeline is a block of named expressions enclosed in
pipeline { ... }:
p = pipeline {
x = 10
y = 20
total = x + y
}
This creates a pipeline with three nodes: x,
y, and total. Each node is computed once, and
the results are cached. Access any node’s value with dot notation:
p.x -- 10
p.y -- 20
p.total -- 30
The pipeline itself displays as:
Pipeline(3 nodes: [x, y, total])
In addition to bare assignments, you can explicitly configure nodes
using the node() function. This lets you define the
execution environment (like the runtime) and custom
serialization methods for when a pipeline is materialized by Nix:
p = pipeline {
data = node(command = read_csv("data.csv"), runtime = T)
-- Running a Python node that trains a model using the pyn wrapper
model = pyn(
command = <{
from sklearn.linear_model import LinearRegression
fit = LinearRegression().fit(X, y)
fit
}>,
serializer = "pmml"
)
}
Bare syntax (like x = 10) is automatically desugared to
x = node(command = 10, runtime = T, serializer = default, deserializer = default).
You can also use pyn(), rn(), and
shn() as shortcuts for Python, R, and shell runtimes. T
enforces cross-runtime safety: if a node with a non-T
runtime depends on a T node, or vice versa, you should
specify an explicit
serializer/deserializer.
When an R node returns a ggplot2 object, or a Python
node returns a matplotlib / plotnine plot
object, T now preserves lightweight plot metadata for REPL inspection.
Reading or printing those artifacts shows a structured summary with the
plot class (ggplot, matplotlib, or
plotnine), runtime backend (R or
Python), title, aesthetic mappings, labels, and layer
information instead of a raw runtime-specific object dump.
script
ArgumentInstead of inlining code with command, you can point a
node to an external source file using the script argument.
This works with node(), pyn(),
rn(), and shn(). The script and
command arguments are mutually exclusive.
p = pipeline {
-- Execute an external R script
model = rn(script = "train_model.R", serializer = "pmml")
-- Execute an external Python script
predictions = pyn(script = "predict.py", deserializer = "pmml")
-- Execute an external shell script
report = shn(script = "postprocess.sh")
-- node() auto-detects the runtime from the file extension
summary = node(script = "summarise.R", serializer = "json")
}
When using script, the runtime is auto-detected from the
file extension (.R → R, .py → Python,
.sh → sh) if not explicitly set via the
runtime argument. T reads the script file to extract
identifier references, allowing the pipeline dependency graph to be
built correctly from variables referenced in the external file.
shn()Use shn() for pipeline steps that are easiest to express
as shell or CLI commands. It is a convenience wrapper around
node(runtime = sh, ...), just like rn() and
pyn() wrap node() for R and Python.
p = pipeline {
-- Exec-style shell node: command + positional argv
fields = shn(
command = "printf",
args = ["first line\\nsecond line\\n"]
)
-- Script-style shell node: inline shell source executed with `sh`
report = shn(command = <{
#!/bin/sh
set -eu
# Dependencies for T's lexical pipeline analysis: summary_r summary_py
printf 'R summary: %s\n' "$T_NODE_summary_r/artifact"
printf 'Python summary: %s\n' "$T_NODE_summary_py/artifact"
}>)
}
There are two useful modes:
command
plus args = [...] to run a program directly with positional
arguments.<{ ... }> or a .sh script,
optionally overriding the interpreter with shell = "bash"
and shell_args = ["-lc"] when you need Bash-specific
syntax.Shell nodes default to serializer = text, which makes
them a good fit for reports, command output, and glue code between other
pipeline nodes. For a full end-to-end example that mixes T, R, Python,
and sh, see
tests/pipeline/polyglot_shell_pipeline.t and
.github/workflows/polyglot-shell-pipeline.yml.
T is designed to orchestrate code across multiple languages. The pipeline runner manages the serialization and deserialization of data between R, Python, and T using a first-class serializer system. For a deep dive into how T handles data interchange, see the Serializers Documentation.
| Format | Option | Best For | Requirement |
|---|---|---|---|
| T Native | "default" |
T-to-T communication | None |
| Arrow | "arrow" |
Large DataFrames | pyarrow (Py), arrow (R) |
| PMML | "pmml" |
Predictive Models | sklearn2pmml (Py), r2pmml (R) |
| JSON | "json" |
Simple lists/dicts | jsonlite (R) |
You can train a model in R and use T’s native OCaml model evaluator to make predictions without leaving the T runtime:
p = pipeline {
-- Node 1: Train model in R using the rn wrapper
model_r = rn(
command = <{
data <- read.csv("data.csv")
lm(mpg ~ wt + hp, data = data)
}>,
serializer = "pmml"
)
-- Node 2: Predict in T using the R model
predictions = node(
command = <{
test_df = read_csv("new_data.csv")
predict(test_df, model_r)
}>,
runtime = "T",
deserializer = "pmml"
)
}
Setting deserializer = "pmml" on the T node tells the
pipeline runner to use T’s native PMML parser to convert the R model
into a T model object.
Nodes can be declared in any order. T automatically resolves dependencies:
p = pipeline {
result = x + y -- depends on x and y
x = 3 -- defined after result
y = 7 -- defined after result
}
p.result -- 10
T builds a dependency graph and executes nodes in topological order,
so x and y are computed before
result regardless of declaration order.
Nodes can depend on other computed nodes, forming chains:
p = pipeline {
a = 1
b = a + 1 -- depends on a
c = b + 1 -- depends on b
d = c + 1 -- depends on c
}
p.d -- 4
Nodes can use any T function, including standard library functions:
p = pipeline {
data = [1, 2, 3, 4, 5]
total = sum(data)
count = length(data)
}
p.total -- 15
p.count -- 5
The pipe operator |> works naturally inside
pipelines:
double = \(x) x * 2
p = pipeline {
a = 5
b = a |> double
}
p.b -- 10
The maybe-pipe ?|> forwards all values — including
errors — to the next function. This is useful for building recovery
logic into pipelines:
recovery = \(x) if (is_error(x)) 0 else x
double = \(x) x * 2
p = pipeline {
raw = 1 / 0 -- Error: division by zero
safe = raw ?|> recovery -- forwards error to recovery → 0
result = safe |> double -- 0 |> double → 0
}
p.safe -- 0
p.result -- 0
Without ?|>, the error from raw would
short-circuit at |> and never reach
recovery. The maybe-pipe lets you intercept errors and
provide fallback values.
Pipelines are most powerful for data analysis workflows. Here’s a complete example loading, transforming, and summarizing data:
p = pipeline {
data = read_csv("employees.csv")
rows = data |> nrow
cols = data |> ncol
names = data |> colnames
}
p.rows -- number of rows
p.cols -- number of columns
p.names -- list of column names
p = pipeline {
raw = read_csv("sales.csv")
filtered = filter(raw, $amount > 100)
by_region = filtered |> group_by($region)
summary = by_region |> summarize($total = sum($amount))
}
p.summary -- DataFrame with regional totals
T provides functions to inspect pipeline structure:
p = pipeline { x = 10; y = 20; total = x + y }
pipeline_nodes(p) -- ["x", "y", "total"]
pipeline_deps(p)
-- {`x`: [], `y`: [], `total`: ["x", "y"]}
pipeline_node(p, "total") -- 30
Use pipeline_run() to re-execute a pipeline:
p2 = pipeline_run(p)
p2.total -- 30 (re-computed)
Re-running produces the same results — T pipelines are deterministic.
Two pipelines with the same definitions always produce the same results:
p1 = pipeline { a = 5; b = a * 2; c = b + 1 }
p2 = pipeline { a = 5; b = a * 2; c = b + 1 }
p1.c == p2.c -- true
In T, errors are first-class values. By default,
evaluation is resilient: if a node fails, it produces
an Error value instead of crashing the pipeline. This
allows other independent nodes to continue building, giving you a full
picture of which parts of your DAG are healthy.
p = pipeline {
a = 1 / 0 -- Produces an Error(DivisionByZero)
b = 1 + 1 -- Still succeeds! (2)
c = a + 1 -- Fails because 'a' is an error (Error)
}
When you print or build this pipeline, T provides a summary of which nodes succeeded and which failed.
--failfast FlagIf you prefer the usual, common behaviour where evaluation stops
immediately at the first error, you can use the --failfast
flag:
$ t run --failfast src/pipeline.tIn your T scripts, you can also opt-in to this behavior via
t_make():
t_make(failfast = true)
T detects circular dependencies and reports them at construction time, before any nodes are executed:
pipeline {
a = b
b = a
}
-- Error(ValueError: "Pipeline has a dependency cycle involving node 'a'")
Accessing a non-existent node returns a structured error:
p = pipeline { x = 10 }
p.nonexistent
-- Error(KeyError: "node 'nonexistent' not found in Pipeline")
Defining a pipeline with pipeline { ... } evaluates
nodes in-memory. To materialize them as reproducible
Nix artifacts (potentially using R or Python dependencies you’ve defined
in tproject.toml), use populate_pipeline()
with the build = true argument:
p = pipeline {
data = read_csv("sales.csv")
total = sum(data.$amount)
}
populate_pipeline(p, build = true)
populate_pipeline(p, build = true) is the primary
command for materializing a pipeline. It does the following:
_pipeline/ directory
with pipeline.nix and dag.json.[r-dependencies] or
[py-dependencies] in your tproject.toml,
pipeline nodes have access to these language environments!_pipeline/build_log_YYYYMMdd_HHmmss_hash.json).[!NOTE]
build_pipeline(p)is available as a shorthand forpopulate_pipeline(p, build = true).
After building, use read_node() or
load_node() to retrieve materialized values:
read_node("total") -- reads the serialized artifact for "total"
load_node("total") -- same as read_node, loads the artifact
These functions look up the node in the latest build log and deserialize the artifact.
For more control over the build process, T provides
populate_pipeline(). This function prepares the pipeline
infrastructure without necessarily triggering the Nix build
immediately.
populate_pipeline(p) -- Prepares _pipeline/ only
populate_pipeline(p, build = true) -- Same as build_pipeline(p)
_pipeline/
directoryT maintains a persistent state directory for your pipeline. When you populate or build, T creates:
_pipeline/pipeline.nix: The generated
Nix expression for your pipeline nodes._pipeline/dag.json: A machine-readable
dependency graph of your pipeline._pipeline/build_log_*.json: History of
previous successful builds.T keeps a history of your builds in _pipeline/. This
enables Time Travel — the ability to read artifacts
from specific past versions of your pipeline.
Use list_logs() to see available build logs:
logs = list_logs()
-- DataFrame of build logs with filename, modification_time, and size_kb
Use inspect_pipeline() to view the build status of a
specific pipeline as a DataFrame (defaults to the latest):
inspect_pipeline()
-- DataFrame(5 rows x 4 cols: [derivation, build_success, path, output])
inspect_pipeline(which_log = "20260221_143022")
Pass the which_log argument to read_node()
to specify which build to read from. You can pass a regex pattern or a
specific filename:
-- Read the latest version (default)
val = read_node("result")
-- Read from a specific historical build
val_old = read_node("result", which_log = "20260221_143022")
This ensures that even as you update your code and data, you can always recover and compare results from previous runs.
T enforces a clear separation between interactive and non-interactive execution:
t run)Scripts executed with t run must call
populate_pipeline() (or build_pipeline()).
This ensures that non-interactive execution always produces reproducible
Nix artifacts. By default, t run operates in
resilient mode, continuing past errors to provide a
full diagnostic summary. Use --failfast to change this.
# ✅ This works — resilient by default
$ t run my_pipeline.t
# 🛑 This fails immediately on the first error
$ t run --failfast my_pipeline.t
# ❌ This is rejected — script doesn't call populate_pipeline()
$ t run my_script.t
# Error: non-interactive execution requires a pipeline.
# Use --unsafe to override.A valid pipeline script looks like:
-- my_pipeline.t
p = pipeline {
data = read_csv("input.csv")
result = data |> filter($value > 0) |> summarize(total = sum($value))
}
populate_pipeline(p, build = true)
The REPL is unrestricted — you can run any T code line by line, whether or not it involves pipelines:
$ t repl
T> x = 1 + 2
T> print(x)
3
T> p = pipeline { a = 10 }
T> p.a
10When a pipeline is built with build_pipeline(), each
node runs inside a Nix sandbox — an isolated build
environment. Import statements from your script are
automatically propagated into each sandbox, so imported
packages and functions are available to all nodes.
-- pipeline.t
import my_stats
import data_utils[read_clean, normalize]
p = pipeline {
raw = read_csv("data.csv")
clean = read_clean(raw) -- uses imported function
normed = normalize(clean) -- uses imported function
result = weighted_mean(normed.$x, normed.$w) -- uses imported function
}
build_pipeline(p)
When build_pipeline(p) generates the Nix derivation for
each node, it prepends the import statements:
-- Generated node_script.t (inside Nix sandbox)
import my_stats
import data_utils[read_clean, normalize]
raw = deserialize("$T_NODE_raw/artifact.tobj")
result = weighted_mean(raw.$x, raw.$w)
serialize(result, "$out/artifact.tobj")
All three import forms are supported:
| Syntax | Effect |
|---|---|
import "src/helpers.t" |
Import a local file |
import my_stats |
Import all public functions from a package |
import my_stats[foo, bar] |
Import specific functions |
import my_stats[wm=weighted_mean] |
Import with aliases |
The explain() function provides structured metadata
about pipelines:
p = pipeline {
x = 10
y = x + 5
z = y * 2
}
e = explain(p)
e.kind -- "pipeline"
e.node_count -- 3
You can explicitly skip a node (and by extension, all nodes that
depend on it) by passing the noop = true argument to the
node() function.
p = pipeline {
raw_data = read_csv("raw.csv")
# This node and its dependencies won't trigger a heavy Nix build
expensive_model = rn(
command = train(raw_data),
noop = true
)
# This node depends on expensive_model, therefore it becomes a noop as well
report = rn(command = generate_report(expensive_model))
}
populate_pipeline(p, build = true)
In a Nix sandbox context, noop generates a lightweight
stub instead of a real build derivation.
Every node in a pipeline carries structured metadata that you can
query and manipulate. The pipeline_to_frame() function
converts this metadata into a DataFrame with one row per node.
pipeline_to_framep = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_to_frame(p)
-- DataFrame(3 rows x 8 cols: [name, runtime, serializer, deserializer, noop, deps, depth, command_type])
The columns returned are:
| Column | Type | Description |
|---|---|---|
name |
String | Unique node identifier |
runtime |
String | "T", "R", or "Python" |
serializer |
String | e.g. "default", "pmml" |
deserializer |
String | e.g. "default", "pmml" |
noop |
Bool | Whether the node is a no-op |
deps |
String | Comma-separated dependency names |
depth |
Int | Topological depth (roots = 0) |
command_type |
String | "command" or "script" |
pipeline_to_frame is the foundation for inspection: you
can use T’s standard filter, select, and
arrange verbs on the resulting DataFrame.
pipeline_summarypipeline_summary(p) is a convenience alias for
pipeline_to_frame(p):
pipeline_summary(p)
-- same output as pipeline_to_frame(p)
select_nodeselect_node returns a DataFrame with only the columns
you request, using NSE $field references:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
p |> select_node($name, $runtime, $depth)
-- DataFrame: name="a", runtime="T", depth=0
-- name="b", runtime="R", depth=0
-- name="c", runtime="T", depth=1
Available fields: $name, $runtime,
$serializer, $deserializer,
$noop, $deps, $depth,
$command_type.
Pipeline nodes can pass environment variables into the Nix build
sandbox via the env_vars named argument on
node(), py()/pyn(), and
rn(). This allows nodes to configure their build-time
execution environment without embedding those values directly into the
command body.
p = pipeline {
model = rn(
command = <{ train_model(data) }>,
env_vars = [
MODEL_MODE: "train",
RETRIES: 2,
DEBUG: true
]
)
}
The env_vars dictionary supports the following
scalar-like values:
| Type | Example | Nix Output |
|---|---|---|
| String | "train" |
"train" |
| Symbol | train |
"train" |
| Int | 2 |
"2" |
| Float | 3.14 |
"3.14" (up to 15 significant digits) |
| Bool | true |
"true" |
| NA | NA |
(Omitted from derivation) |
T performs early validation on environment variables: -
env_vars must be a dictionary. - Unsupported types (like
Lists or nested Dicts) trigger a structured type error during pipeline
construction. - NA values are silently omitted from the
generated Nix derivation instead of being materialized as empty
strings.
These variables are automatically threaded into the generated
stdenv.mkDerivation and are available via standard system
methods (e.g., Sys.getenv() in R or os.environ
in Python) during the Nix build step.
_node family)T provides a set of colcraft-style verbs for operating on pipeline
nodes. These mirror the DataFrame API, using NSE $field
references for node metadata fields.
filter_nodeReturns a new pipeline containing only the nodes where the predicate
is true. No DAG validity check is performed — if a retained node
references a removed node, that surfaces at build_pipeline
time.
p = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
score = node(command = predict(model, load), deserializer = "pmml")
}
-- Keep only R nodes
p |> filter_node($runtime == "R") |> pipeline_nodes
-- ["model"]
-- Keep only nodes with no noop flag
p |> filter_node($noop == false) |> pipeline_nodes
-- Keep only shallow nodes (root and depth-1 nodes)
p |> filter_node($depth <= 1) |> pipeline_nodes
which_nodesfilter_node rewrites the pipeline itself.
which_nodes is the read-only counterpart: it filters the
richer node records you would otherwise have to access manually through
read_pipeline(p).nodes.
This is especially useful for diagnostics queries because each record
includes name, value, and
diagnostics.
p = pipeline {
bad = 1 / 0
ok = 42
downstream = bad + 1
}
-- Keep only nodes with captured errors
which_nodes(p, !is_na(diagnostics.error))
-- Same idea, but return only the node names
which_nodes(p, !is_na(diagnostics.error))
|> map(\(node) node.name)
-- ["bad", "downstream"]
-- Explicit predicate functions still work too
has_error = \(node) !is_na(node.diagnostics.error)
which_nodes(p, has_error)
-- Convenience shortcut for the most common case
errored_nodes(p) |> map(\(node) node.name)
mutate_nodeModifies metadata fields on all nodes, or scoped to a subset using
the where argument:
-- Mark all nodes as noop
p |> mutate_node($noop = true)
-- Mark only R nodes as noop (useful for skipping heavy computations)
p |> mutate_node($noop = true, where = $runtime == "R")
-- Override serializer for all nodes
p |> mutate_node($serializer = "pmml", where = $runtime == "R")
Mutable metadata fields: noop (Bool),
runtime (String), serializer (String),
deserializer (String).
rename_nodeRenames a single node and automatically rewires all dependency edges
that referenced the old name. This is the canonical way to resolve name
collisions before set operations like union.
p = pipeline { a = 1; b = a + 1 }
p2 = p |> rename_node("a", "alpha")
pipeline_nodes(p2) -- ["alpha", "b"]
pipeline_deps(p2) -- {`alpha`: [], `b`: ["alpha"]}
Attempting to rename to a name that already exists is an error:
p |> rename_node("a", "b")
-- Error(ValueError: "A node named `b` already exists in the Pipeline.")
arrange_nodeReturns a new pipeline with nodes sorted by a metadata field. This affects only display/serialization order — the DAG determines execution order.
Beyond basic execution, T allows you to treat a Pipeline as a queryable and mutable data structure. This is powerful for meta-programming, automated reporting, and “surgical” updates to large analysis graphs.
In a production setting, you may want to extract the errors from a failed pipeline run to log them or send an alert.
p = build_pipeline(p)
-- Get detailed records for all failed nodes
failed_records = errored_nodes(p)
-- Extract just the names and error messages
errors = map(failed_records, \(n) [name: n.name, msg: n.diagnostics.error])
If you have a massive pipeline but only want to visualize or re-run a
specific subset (e.g., all Python nodes), use
filter_node():
-- Create a subgraph of only Python-based computations
py_pipeline = p |> filter_node($runtime == "Python")
-- Create a subgraph of 'shallow' nodes (roots and their immediate children)
shallow_p = p |> filter_node($depth <= 1)
Lenses allow you to modify a pipeline specification without using the
pipeline { ... } block again. This is useful for “what-if”
analysis or dynamic configuration.
-- 1. Identify a node to skip
noop_l = node_meta_lens("heavy_computation", "noop")
-- 2. Toggle the noop flag surgically
p_fast = p |> set(noop_l, true)
-- 3. Swap a runtime for testing
p_test = p |> set(node_meta_lens("model_train", "runtime"), "R")
If you have a VPipeline object (from
read_pipeline()), you can use lenses to safely extract
values from specific nodes.
p_info = read_pipeline(p)
-- Focus on the 'summary' node's value
summary_l = node_lens("summary")
summary_df = get(p_info, summary_l)
p = pipeline { z = 1; a = 2; m = 3 }
p |> arrange_node($name) |> pipeline_nodes -- ["a", "m", "z"]
p |> arrange_node($name, "desc") |> pipeline_nodes -- ["z", "m", "a"]
-- Sort a chain by depth (shallowest first)
p = pipeline { a = 1; b = a + 1; c = b + 1 }
p |> arrange_node($depth) |> pipeline_nodes -- ["a", "b", "c"]
Pipelines can be treated as named sets of nodes. T provides four set operations that combine or subtract pipelines.
Immutability: All set operations return new Pipelines. The original pipelines are never modified.
Lazy validation: Set operations do not check DAG validity. If the result has dangling references, errors surface at
build_pipelineorpipeline_runtime.
unionMerges two pipelines, including all nodes from both. Errors
immediately on any name collision. Use rename_node to
resolve collisions first.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> union(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
If both pipelines have a node named clean:
p_etl |> union(p_model)
-- Error(ValueError: "Function `union`: name collision(s) detected: clean. Use `rename_node` to resolve.")
-- Fix: rename before merging
p_model2 = p_model |> rename_node("clean", "clean_model")
p_etl |> union(p_model2)
Nodes that produce large numbers of non-terminal warnings (like those
from filter() or complex modeling functions) can be
silenced using the suppress_warnings combinator. This
silences the console output for a node while maintaining the warning
records for auditability.
p = pipeline {
-- High-noise node with suppressed warnings
filtered = dataframe([[x: 1], [x: NA], [x: 3]])
|> filter($x > 1)
|> suppress_warnings
-- Downstream node remains unaffected
count = nrow(filtered)
}
When building or running a pipeline with suppressed nodes, the summary reflects this state:
Pipeline summary: 1 node(s) with warnings, 1 suppressed, 0 error(s)
○ filtered — warnings suppressed by caller (1 NAs ignored)
The ○ symbol indicates a suppressed node. You can still
access the underlying warning objects programmatically via
read_node() or read_pipeline().
res = read_node(p, "filtered")
res.diagnostics.warnings_suppressed -- true
res.diagnostics.warnings -- list of captured warnings
differenceRemoves from the first pipeline all nodes whose names appear in the second pipeline. Nodes in the second pipeline that don’t exist in the first are silently ignored.
p = pipeline { a = 1; b = 2; c = 3; d = 4 }
p_remove = pipeline { b = 0; d = 0 }
p |> difference(p_remove) |> pipeline_nodes -- ["a", "c"]
intersectRetains only nodes present by name in both pipelines, using definitions from the first pipeline.
p1 = pipeline { a = 1; b = 2; c = 3 }
p2 = pipeline { b = 99; c = 100; d = 4 }
p1 |> intersect(p2) |> pipeline_nodes -- ["b", "c"] (p1's definitions)
patchLike union, but only updates nodes that already exist in
the first pipeline — it will not add new nodes from the second pipeline.
Ideal for overriding configurations without accidentally importing stray
nodes.
p_prod = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
}
p_overrides = pipeline {
model = rn(command = <{ lm(y ~ x + z, data = load) }>, serializer = "pmml")
extra = 99 -- stray node
}
p_updated = p_prod |> patch(p_overrides)
pipeline_nodes(p_updated) -- ["load", "model"] — "extra" was not added
These operations are structurally aware of the pipeline’s dependency graph and are used to replace node implementations, reroute edges, and extract subgraphs.
swapReplaces a node’s implementation while preserving its existing dependency edges. The new node is specified as the third argument.
p = pipeline {
data = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = data) }>, serializer = "pmml")
score = node(command = predict(model, data), deserializer = "pmml")
}
-- Replace the model node with a new implementation; edges to/from model are preserved
new_model = rn(command = <{ glm(y ~ x, data = data, family = binomial) }>, serializer = "pmml")
p2 = p |> swap("model", new_model)
pipeline_deps(p2)
-- `model` still depends on `data`, and `score` still depends on `model`
rewireReroutes a node’s declared dependencies. The replace
argument maps old dependency names to new ones. Only the named node’s
dependency list is updated.
p = pipeline {
data = read_csv("data.csv")
data_v2 = read_csv("data_v2.csv")
model = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
-- Re-point model to use data_v2 instead of data
p2 = p |> rewire("model", replace = list(data = "data_v2"))
pipeline_deps(p2)
-- {`data`: [], `data_v2`: [], `model`: ["data_v2"]}
pruneRemoves all leaf nodes — nodes that nothing else depends on. This is
useful for cleaning up intermediate pipelines after
filter_node or difference operations that may
leave orphaned utility nodes.
p = pipeline { a = 1; b = a + 1; c = 3 }
-- `a` is depended on by `b`, so it is not a leaf.
-- `b` depends on `a` but nothing depends on `b` — it is a leaf.
-- `c` is independent and nothing depends on it — it is also a leaf.
p |> prune |> pipeline_nodes -- ["a"] (both b and c are leaves, removed)
You can chain difference and prune to strip
unwanted branches in one step:
p_partial = p |> difference(p_debug_nodes) |> prune
upstream_ofReturns a new pipeline containing the named node and all its transitive ancestors (everything the node depends on, directly or indirectly).
p = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
model = rn(command = <{ lm(y ~ x, clean) }>, serializer = "pmml")
report = summary(model)
sidebar = "metadata"
}
-- Everything needed to produce `model`
p |> upstream_of("model") |> pipeline_nodes -- ["raw", "clean", "model"]
-- sidebar is excluded because model doesn't depend on it
downstream_ofReturns a new pipeline containing the named node and all nodes that transitively depend on it (everything that uses this node, directly or indirectly).
-- Everything that is affected if `clean` changes
p |> downstream_of("clean") |> pipeline_nodes -- ["clean", "model", "report"]
-- raw and sidebar are excluded
subgraphReturns the full connected component of a node — the union of its ancestors and descendants.
p = pipeline { a = 1; b = a + 1; c = b + 1; d = 99 }
-- Everything connected to b (upstream and downstream)
p |> subgraph("b") |> pipeline_nodes -- ["a", "b", "c"] — d is disconnected
These higher-level operators combine two complete, separately-defined pipelines into one.
chainConnects two pipelines where the second pipeline’s nodes reference
node names from the first as dependencies. T verifies that at least one
such shared reference exists; if the two pipelines are completely
disconnected, chain raises an error.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
-- p_model references `clean` from p_etl — this is the wire
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> chain(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
chain is stricter than union: it requires
an intent to connect the pipelines, catching accidental merges
where no wiring was meant.
T’s dependency tracking works differently depending on the node’s
runtime. This leads to a specific limitation when using
chain() with R or Python pipelines.
lm()) and a T variable from a different
pipeline.To avoid polluting your build environment with R/Python functions as Nix dependencies, T ignores external references inside RawCode blocks when they are not defined in the current pipeline block.
This means chain() will fail to automatically
wire R/Python nodes to nodes in other pipelines.
If you need an R or Python node to depend on a node from a separate
pipeline via chain(), you must “bring” that dependency into
the pipeline block using a T-expression stub with an aliased
name.
❌ Broken: R node cannot “see” raw_data for
chaining
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
model = rn(<{
lm(mpg ~ hp, data = raw_data)
}>)
}
-- Error: "no shared dependency names found"
p_full = p_data |> chain(p_model)
❌ Also broken: self-referential stub
p_model = pipeline {
raw_data = raw_data -- Error: "Self-referential node detected"
model = rn(<{ lm(mpg ~ hp, data = raw_data) }>)
}
✅ Fixed: Use a T-stub with an aliased name
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
-- Aliased T-stub: different name on the left, raw_data on the right.
-- T can parse the RHS and see `raw_data` as an external dependency.
data_input = raw_data
model = rn(<{
lm(mpg ~ hp, data = data_input) -- use the alias name in R
}>,
deserializer = "arrow")
}
-- Success! T sees `raw_data` as a dependency of `data_input`, wiring the pipelines.
p_full = p_data |> chain(p_model)
By giving the stub a different name
(data_input = raw_data), you avoid a self-reference while
still creating a T-expression that references raw_data. T
can parse the right-hand side, detect the cross-pipeline dependency, and
allow chain() to wire the pipelines together. Note that
R/Python code inside the chained node should use the alias
name (data_input) as the variable, not the
original (raw_data).
Combines two pipelines that are intended to run independently. No dependency wiring is performed. Errors on name collision.
p_r_model = pipeline {
r_fit = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
p_py_model = pipeline {
py_fit = pyn(
command = <{
from sklearn.linear_model import LinearRegression
LinearRegression().fit(X, y)
}>,
serializer = "pmml"
)
}
-- Both models will run independently
p_both = parallel(p_r_model, p_py_model)
pipeline_nodes(p_both) -- ["r_fit", "py_fit"]
Beyond pipeline_nodes and pipeline_deps, T
provides a complete structural inspection surface for pipelines.
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_roots(p) -- ["a"] — nodes with no dependencies
pipeline_leaves(p) -- ["c"] — nodes nothing depends on
pipeline_edges returns a list of [from, to]
pairs representing every edge in the DAG:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_edges(p) -- [["a", "b"], ["b", "c"]]
This is useful for serializing the graph structure or feeding it to external tools.
pipeline_depth returns the maximum topological depth
across all nodes (root nodes have depth 0):
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_depth(p) -- 2
pipeline_cycles returns any node names involved in
dependency cycles. A correctly formed pipeline always returns an empty
list:
p = pipeline { a = 1; b = a + 1 }
pipeline_cycles(p) -- []
pipeline_printPrints a human-readable summary of all nodes to stdout, including their runtime, depth, noop status, and dependency list:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
pipeline_print(p)
-- Pipeline (3 nodes):
-- a runtime=T depth=0 noop=false deps=[]
-- b runtime=R depth=0 noop=false deps=[]
-- c runtime=T depth=1 noop=false deps=[b]
pipeline_dotExports the pipeline as a Graphviz DOT string for visualization:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
dot = pipeline_dot(p)
print(dot)
-- digraph pipeline {
-- rankdir=LR;
-- node [shape=box];
-- "a" [label="a\n[T]"];
-- "b" [label="b\n[T]"];
-- "c" [label="c\n[T]"];
-- "a" -> "b";
-- "b" -> "c";
-- }
Pipe the output to dot -Tpng or paste it into
https://dreampuf.github.io/GraphvizOnline/ to render a visual dependency
graph.
By design, T uses lazy validation: structural errors
(missing dependencies, cycles) surface at build_pipeline or
pipeline_run time, not at operation time. This allows you
to compose and transform pipelines freely.
When you want to validate eagerly, T provides opt-in validation utilities.
pipeline_validateReturns a list of validation error messages. An empty list means the pipeline is structurally valid. This function never throws — it reports problems as data.
p_good = pipeline { a = 1; b = a + 1 }
pipeline_validate(p_good) -- []
-- Build a broken pipeline manually via difference
p_broken = pipeline { a = 1; b = a + 1 } |> filter_node($name == "b")
-- b now depends on a, but a was filtered out
pipeline_validate(p_broken)
-- ["Node `b` depends on `a` which does not exist in the pipeline."]
Checks performed: 1. All referenced dependencies exist as nodes in the pipeline. 2. No dependency cycles.
pipeline_assertLike pipeline_validate, but throws the
first error found instead of returning a list. Returns the pipeline
unchanged if valid. This is useful as a guard at a pipeline’s
construction site.
p = pipeline { a = 1; b = a + 1 }
|> filter_node($depth == 0) -- keeps a only
|> pipeline_assert -- succeeds, returns the pipeline
-- Chaining validation into a construction expression:
safe_pipeline = pipeline { a = 1; b = a + 1 }
|> mutate_node($noop = true, where = $runtime == "R")
|> pipeline_assert
If validation fails:
p_broken |> pipeline_assert
-- Error(ValueError: "Node `b` depends on `a` which does not exist in the pipeline.")
T-Lang uses a lexical analyzer to automatically detect dependencies between nodes by scanning the code for variable names that match other node names. While this is convenient, there are cases where automatic detection is insufficient or may produce false positives.
Sometimes, a node’s code may contain a word that matches another node
name but is intended to be a comment or a string, not a dependency. To
prevent these from causing unwanted dependency cycles, T automatically
strips standard comments starting with --
or # within foreign code blocks
(<{ ... }>) before analyzing the code.
p = pipeline {
data = read_csv("input.csv")
-- The analyzer will IGNORE the string 'results' because it's in a comment.
-- This prevents an accidental dependency on the 'results' node.
process = pyn(command = <{
# We will save the processed results to a file
import pandas as pd
df = data.dropna()
df
}>)
results = node(command = process |> head)
}
depsIn some runtimes, like sh (shell), T cannot always
reliably infer dependencies from the command string. Similarly, you may
want to explicitly declare a dependency that isn’t directly referenced
in the code (e.g., a file produced by another node that your script
reads via a hardcoded path).
For these cases, you can use the deps argument in node
definitions to manually declare one or more dependencies:
p = pipeline {
raw_file = shn(command = <{ curl -o data.csv https://example.com/data.csv }>)
-- This shell node reads data.csv, which is created by raw_file.
-- We use the `deps` argument to ensure raw_file executes first.
summary = shn(
command = <{ cat data.csv | wc -l }>,
deps = [raw_file],
serializer = "text"
)
}
Key Features of deps: -
First-Class Syntax: deps is an optional
argument available in node(), rn(),
pyn(), and shn(). - Bare
Identifiers: You can list direct node names as bare identifiers
(e.g., deps = [node1, node2]). - Manual
Override: It ensures the specified nodes are added to the
dependency graph even if they aren’t parsed from the command or script
body. - Strict Validation: T validates that all listed
dependencies exist within the same pipeline.
raw_data, filtered_sales,
summary_statspipeline_nodes(), pipeline_deps(), and
pipeline_to_frame() to understand pipeline structurepipeline_assert at the end of a construction chain to catch
structural errors earlychain over
union: When two pipelines are intentionally
connected, chain makes the dependency explicit; use
union only when combining truly independent pipelinesfilter_node + upstream_of for
partial builds: Trim a large pipeline to just what you need
before calling build_pipelinerename_node before set
ops: Both union and chain enforce
unique names; rename conflicting nodes before merging-- A full data analysis pipeline
p = pipeline {
-- Load data
raw = read_csv("employees.csv")
-- Filter to active engineers
engineers = raw
|> filter($dept == "eng")
|> filter($active == true)
-- Compute statistics
avg_salary = engineers.salary |> mean
salary_sd = engineers.salary |> sd
team_size = engineers |> nrow
-- Sort by performance
ranked = engineers |> arrange("score", "desc")
}
-- Access results
p.team_size -- number of active engineers
p.avg_salary -- mean salary
p.ranked -- DataFrame sorted by score
When nodes are executed within a Nix-managed sandbox (via
populate_pipeline(p, build = true)), they are isolated from
each other. However, T provides a built-in mechanism for nodes to access
the serialized artifacts of their dependencies.
For every dependency dep that a node has, the pipeline
runner automatically injects an environment variable named
T_NODE_<dep> into the sandbox. This variable contains
the path to the Nix store directory where that dependency’s artifact is
stored.
node_lensThe canonical way to access a sibling node’s artifact is using the
node_lens with the single-argument get()
function. This is preferred over manual environment variable lookup
because: 1. It is portable: T handles the path
resolution and deserialization automatically. 2. It is
integrated: It uses the same deserializer system as the
rest of the pipeline.
p = pipeline {
node_a = node(command = 100, serializer = "json")
-- This node retrieves node_a's value from its Nix artifact
dynamic_access = node(
command = {
-- Using get(node_lens("...")) for cross-node access
val = get(node_lens("node_a"))
val * 2
},
runtime = "T"
)
}
When dynamic_access runs inside the Nix sandbox: 1. T
sees the node_lens("node_a") and looks for the
T_NODE_node_a environment variable. 2. It locates the
artifact file within that path. 3. It detects the artifact
class (e.g., Int from JSON) and deserializes it back into a
T value.
This pattern is essential for polyglot pipelines
where data is passed between T, R, and Python nodes through files, and
for dynamic access nodes where the target of a
retrieval is determined at runtime (e.g.,
target = "A"; get(node_lens(target))).
Now that you’ve mastered pipelines, learn how to manage reproducible projects and develop T packages: