A step-by-step guide to T’s pipeline execution model
Pipelines are T’s core execution model. They let you define named computation steps (nodes) that are automatically ordered by their dependencies, executed deterministically, and cached for re-use.
A pipeline is a block of named expressions enclosed in
pipeline { ... }:
p = pipeline {
x = 10
y = 20
total = x + y
}
This creates a pipeline with three nodes: x,
y, and total. Each node is computed once, and
the results are cached. Access any node’s value with dot notation:
p.x -- 10
p.y -- 20
p.total -- 30
The pipeline itself displays as:
Pipeline(3 nodes: [x, y, total])
In addition to bare assignments, you can explicitly configure nodes
using the node() function. This lets you define the
execution environment (like the runtime) and custom
serialization methods for when a pipeline is materialized by Nix:
p = pipeline {
data = node(command = read_csv("data.csv"), runtime = T)
-- Running a Python node that trains a model using the pyn wrapper
model = pyn(
command = <{
from sklearn.linear_model import LinearRegression
fit = LinearRegression().fit(X, y)
fit
}>,
serializer = "pmml"
)
}
Bare syntax (like x = 10) is automatically desugared to
x = node(command = 10, runtime = T, serializer = default, deserializer = default).
You can also use pyn(), rn(), and
shn() as shortcuts for Python, R, and shell runtimes. T
enforces cross-runtime safety: if a node with a non-T
runtime depends on a T node, or vice versa, you should
specify an explicit
serializer/deserializer.
script
ArgumentInstead of inlining code with command, you can point a
node to an external source file using the script argument.
This works with node(), pyn(),
rn(), and shn(). The script and
command arguments are mutually exclusive.
p = pipeline {
-- Execute an external R script
model = rn(script = "train_model.R", serializer = "pmml")
-- Execute an external Python script
predictions = pyn(script = "predict.py", deserializer = "pmml")
-- Execute an external shell script
report = shn(script = "postprocess.sh")
-- node() auto-detects the runtime from the file extension
summary = node(script = "summarise.R", serializer = "json")
}
When using script, the runtime is auto-detected from the
file extension (.R → R, .py → Python,
.sh → sh) if not explicitly set via the
runtime argument. T reads the script file to extract
identifier references, allowing the pipeline dependency graph to be
built correctly from variables referenced in the external file.
shn()Use shn() for pipeline steps that are easiest to express
as shell or CLI commands. It is a convenience wrapper around
node(runtime = sh, ...), just like rn() and
pyn() wrap node() for R and Python.
p = pipeline {
-- Exec-style shell node: command + positional argv
fields = shn(
command = "printf",
args = ["first line\\nsecond line\\n"]
)
-- Script-style shell node: inline shell source executed with `sh`
report = shn(command = <{
#!/bin/sh
set -eu
# Dependencies for T's lexical pipeline analysis: summary_r summary_py
printf 'R summary: %s\n' "$T_NODE_summary_r/artifact"
printf 'Python summary: %s\n' "$T_NODE_summary_py/artifact"
}>)
}
There are two useful modes:
command
plus args = [...] to run a program directly with positional
arguments.<{ ... }> or a .sh script,
optionally overriding the interpreter with shell = "bash"
and shell_args = ["-lc"] when you need Bash-specific
syntax.Shell nodes default to serializer = text, which makes
them a good fit for reports, command output, and glue code between other
pipeline nodes. For a full end-to-end example that mixes T, R, Python,
and sh, see
tests/pipeline/polyglot_shell_pipeline.t and
.github/workflows/polyglot-shell-pipeline.yml.
T is designed to orchestrate code across multiple languages. The pipeline runner manages the serialization and deserialization of data between R, Python, and T using industry-standard formats.
| Format | Option | Best For | Requirement |
|---|---|---|---|
| T Native | "default" |
T-to-T communication | None |
| Arrow | "arrow" |
Large DataFrames | pyarrow (Py), arrow (R) |
| PMML | "pmml" |
Predictive Models | sklearn2pmml (Py), r2pmml (R) |
| JSON | "json" |
Simple lists/dicts | jsonlite (R) |
You can train a model in R and use T’s native OCaml model evaluator to make predictions without leaving the T runtime:
p = pipeline {
-- Node 1: Train model in R using the rn wrapper
model_r = rn(
command = <{
data <- read.csv("data.csv")
lm(mpg ~ wt + hp, data = data)
}>,
serializer = "pmml"
)
-- Node 2: Predict in T using the R model
predictions = node(
command = <{
test_df = read_csv("new_data.csv")
predict(test_df, model_r)
}>,
runtime = "T",
deserializer = "pmml"
)
}
Setting deserializer = "pmml" on the T node tells the
pipeline runner to use T’s native PMML parser to convert the R model
into a T model object.
Nodes can be declared in any order. T automatically resolves dependencies:
p = pipeline {
result = x + y -- depends on x and y
x = 3 -- defined after result
y = 7 -- defined after result
}
p.result -- 10
T builds a dependency graph and executes nodes in topological order,
so x and y are computed before
result regardless of declaration order.
Nodes can depend on other computed nodes, forming chains:
p = pipeline {
a = 1
b = a + 1 -- depends on a
c = b + 1 -- depends on b
d = c + 1 -- depends on c
}
p.d -- 4
Nodes can use any T function, including standard library functions:
p = pipeline {
data = [1, 2, 3, 4, 5]
total = sum(data)
count = length(data)
}
p.total -- 15
p.count -- 5
The pipe operator |> works naturally inside
pipelines:
double = \(x) x * 2
p = pipeline {
a = 5
b = a |> double
}
p.b -- 10
The maybe-pipe ?|> forwards all values — including
errors — to the next function. This is useful for building recovery
logic into pipelines:
recovery = \(x) if (is_error(x)) 0 else x
double = \(x) x * 2
p = pipeline {
raw = 1 / 0 -- Error: division by zero
safe = raw ?|> recovery -- forwards error to recovery → 0
result = safe |> double -- 0 |> double → 0
}
p.safe -- 0
p.result -- 0
Without ?|>, the error from raw would
short-circuit at |> and never reach
recovery. The maybe-pipe lets you intercept errors and
provide fallback values.
Pipelines are most powerful for data analysis workflows. Here’s a complete example loading, transforming, and summarizing data:
p = pipeline {
data = read_csv("employees.csv")
rows = data |> nrow
cols = data |> ncol
names = data |> colnames
}
p.rows -- number of rows
p.cols -- number of columns
p.names -- list of column names
p = pipeline {
raw = read_csv("sales.csv")
filtered = filter(raw, $amount > 100)
by_region = filtered |> group_by($region)
summary = by_region |> summarize($total = sum($amount))
}
p.summary -- DataFrame with regional totals
T provides functions to inspect pipeline structure:
p = pipeline { x = 10; y = 20; total = x + y }
pipeline_nodes(p) -- ["x", "y", "total"]
pipeline_deps(p)
-- {`x`: [], `y`: [], `total`: ["x", "y"]}
pipeline_node(p, "total") -- 30
Use pipeline_run() to re-execute a pipeline:
p2 = pipeline_run(p)
p2.total -- 30 (re-computed)
Re-running produces the same results — T pipelines are deterministic.
Two pipelines with the same definitions always produce the same results:
p1 = pipeline { a = 5; b = a * 2; c = b + 1 }
p2 = pipeline { a = 5; b = a * 2; c = b + 1 }
p1.c == p2.c -- true
T detects circular dependencies and reports them:
pipeline {
a = b
b = a
}
-- Error(ValueError: "Pipeline has a dependency cycle involving node 'a'")
If a node fails, the error is captured and reported:
pipeline {
a = 1 / 0
b = a + 1
}
-- Error(ValueError: "Pipeline node 'a' failed: ...")
Accessing a non-existent node returns a structured error:
p = pipeline { x = 10 }
p.nonexistent
-- Error(KeyError: "node 'nonexistent' not found in Pipeline")
Defining a pipeline with pipeline { ... } evaluates
nodes in-memory. To materialize them as reproducible
Nix artifacts (potentially using R or Python dependencies you’ve defined
in tproject.toml), use populate_pipeline()
with the build = true argument:
p = pipeline {
data = read_csv("sales.csv")
total = sum(data.$amount)
}
populate_pipeline(p, build = true)
populate_pipeline(p, build = true) is the primary
command for materializing a pipeline. It does the following:
_pipeline/ directory
with pipeline.nix and dag.json.[r-dependencies] or
[py-dependencies] in your tproject.toml,
pipeline nodes have access to these language environments!_pipeline/build_log_YYYYMMdd_HHmmss_hash.json).[!NOTE]
build_pipeline(p)is available as a shorthand forpopulate_pipeline(p, build = true).
After building, use read_node() or
load_node() to retrieve materialized values:
read_node("total") -- reads the serialized artifact for "total"
load_node("total") -- same as read_node, loads the artifact
These functions look up the node in the latest build log and deserialize the artifact.
For more control over the build process, T provides
populate_pipeline(). This function prepares the pipeline
infrastructure without necessarily triggering the Nix build
immediately.
populate_pipeline(p) -- Prepares _pipeline/ only
populate_pipeline(p, build = true) -- Same as build_pipeline(p)
_pipeline/
directoryT maintains a persistent state directory for your pipeline. When you populate or build, T creates:
_pipeline/pipeline.nix: The generated
Nix expression for your pipeline nodes._pipeline/dag.json: A machine-readable
dependency graph of your pipeline._pipeline/build_log_*.json: History of
previous successful builds.T keeps a history of your builds in _pipeline/. This
enables Time Travel — the ability to read artifacts
from specific past versions of your pipeline.
Use list_logs() to see available build logs:
logs = list_logs()
-- DataFrame of build logs with filename, modification_time, and size_kb
Use inspect_pipeline() to view the build status of a
specific pipeline as a DataFrame (defaults to the latest):
inspect_pipeline()
-- DataFrame(5 rows x 4 cols: [derivation, build_success, path, output])
inspect_pipeline(which_log = "20260221_143022")
Pass the which_log argument to read_node()
to specify which build to read from. You can pass a regex pattern or a
specific filename:
-- Read the latest version (default)
val = read_node("result")
-- Read from a specific historical build
val_old = read_node("result", which_log = "20260221_143022")
This ensures that even as you update your code and data, you can always recover and compare results from previous runs.
T enforces a clear separation between interactive and non-interactive execution:
t run)Scripts executed with t run must call
populate_pipeline() (or build_pipeline()).
This ensures that non-interactive execution always produces reproducible
Nix artifacts.
# ✅ This works — script defines and populates a pipeline
$ t run my_pipeline.t
# ❌ This is rejected — script doesn't call populate_pipeline()
$ t run my_script.t
# Error: non-interactive execution requires a pipeline.
# Scripts run with `t run` must call `populate_pipeline(p, build=true)` or `build_pipeline()`.
# Use the REPL for interactive exploration, or pass --unsafe to override.A valid pipeline script looks like:
-- my_pipeline.t
p = pipeline {
data = read_csv("input.csv")
result = data |> filter($value > 0) |> summarize(total = sum($value))
}
populate_pipeline(p, build = true)
The REPL is unrestricted — you can run any T code line by line, whether or not it involves pipelines:
$ t repl
T> x = 1 + 2
T> print(x)
3
T> p = pipeline { a = 10 }
T> p.a
10When a pipeline is built with build_pipeline(), each
node runs inside a Nix sandbox — an isolated build
environment. Import statements from your script are
automatically propagated into each sandbox, so imported
packages and functions are available to all nodes.
-- pipeline.t
import my_stats
import data_utils[read_clean, normalize]
p = pipeline {
raw = read_csv("data.csv")
clean = read_clean(raw) -- uses imported function
normed = normalize(clean) -- uses imported function
result = weighted_mean(normed.$x, normed.$w) -- uses imported function
}
build_pipeline(p)
When build_pipeline(p) generates the Nix derivation for
each node, it prepends the import statements:
-- Generated node_script.t (inside Nix sandbox)
import my_stats
import data_utils[read_clean, normalize]
raw = deserialize("$T_NODE_raw/artifact.tobj")
result = weighted_mean(raw.$x, raw.$w)
serialize(result, "$out/artifact.tobj")
All three import forms are supported:
| Syntax | Effect |
|---|---|
import "src/helpers.t" |
Import a local file |
import my_stats |
Import all public functions from a package |
import my_stats[foo, bar] |
Import specific functions |
import my_stats[wm=weighted_mean] |
Import with aliases |
The explain() function provides structured metadata
about pipelines:
p = pipeline {
x = 10
y = x + 5
z = y * 2
}
e = explain(p)
e.kind -- "pipeline"
e.node_count -- 3
You can explicitly skip a node (and by extension, all nodes that
depend on it) by passing the noop = true argument to the
node() function.
p = pipeline {
raw_data = read_csv("raw.csv")
# This node and its dependencies won't trigger a heavy Nix build
expensive_model = rn(
command = train(raw_data),
noop = true
)
# This node depends on expensive_model, therefore it becomes a noop as well
report = rn(command = generate_report(expensive_model))
}
populate_pipeline(p, build = true)
In a Nix sandbox context, noop generates a lightweight
stub instead of a real build derivation.
Every node in a pipeline carries structured metadata that you can
query and manipulate. The pipeline_to_frame() function
converts this metadata into a DataFrame with one row per node.
pipeline_to_framep = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_to_frame(p)
-- DataFrame(3 rows x 8 cols: [name, runtime, serializer, deserializer, noop, deps, depth, command_type])
The columns returned are:
| Column | Type | Description |
|---|---|---|
name |
String | Unique node identifier |
runtime |
String | "T", "R", or "Python" |
serializer |
String | e.g. "default", "pmml" |
deserializer |
String | e.g. "default", "pmml" |
noop |
Bool | Whether the node is a no-op |
deps |
String | Comma-separated dependency names |
depth |
Int | Topological depth (roots = 0) |
command_type |
String | "command" or "script" |
pipeline_to_frame is the foundation for inspection: you
can use T’s standard filter, select, and
arrange verbs on the resulting DataFrame.
pipeline_summarypipeline_summary(p) is a convenience alias for
pipeline_to_frame(p):
pipeline_summary(p)
-- same output as pipeline_to_frame(p)
select_nodeselect_node returns a DataFrame with only the columns
you request, using NSE $field references:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
p |> select_node($name, $runtime, $depth)
-- DataFrame: name="a", runtime="T", depth=0
-- name="b", runtime="R", depth=0
-- name="c", runtime="T", depth=1
Available fields: $name, $runtime,
$serializer, $deserializer,
$noop, $deps, $depth,
$command_type.
Pipeline nodes can pass environment variables into the Nix build
sandbox via the env_vars named argument on
node(), py()/pyn(), and
rn(). This allows nodes to configure their build-time
execution environment without embedding those values directly into the
command body.
p = pipeline {
model = rn(
command = <{ train_model(data) }>,
env_vars = [
MODEL_MODE: "train",
RETRIES: 2,
DEBUG: true
]
)
}
The env_vars dictionary supports the following
scalar-like values:
| Type | Example | Nix Output |
|---|---|---|
| String | "train" |
"train" |
| Symbol | train |
"train" |
| Int | 2 |
"2" |
| Float | 3.14 |
"3.14" (up to 15 significant digits) |
| Bool | true |
"true" |
| Null | null |
(Omitted from derivation) |
T performs early validation on environment variables: -
env_vars must be a dictionary. - Unsupported types (like
Lists or nested Dicts) trigger a structured type error during pipeline
construction. - null values are silently omitted from the
generated Nix derivation instead of being materialized as empty
strings.
These variables are automatically threaded into the generated
stdenv.mkDerivation and are available via standard system
methods (e.g., Sys.getenv() in R or os.environ
in Python) during the Nix build step.
_node family)T provides a set of colcraft-style verbs for operating on pipeline
nodes. These mirror the DataFrame API, using NSE $field
references for node metadata fields.
filter_nodeReturns a new pipeline containing only the nodes where the predicate
is true. No DAG validity check is performed — if a retained node
references a removed node, that surfaces at build_pipeline
time.
p = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
score = node(command = predict(model, load), deserializer = "pmml")
}
-- Keep only R nodes
p |> filter_node($runtime == "R") |> pipeline_nodes
-- ["model"]
-- Keep only nodes with no noop flag
p |> filter_node($noop == false) |> pipeline_nodes
-- Keep only shallow nodes (root and depth-1 nodes)
p |> filter_node($depth <= 1) |> pipeline_nodes
mutate_nodeModifies metadata fields on all nodes, or scoped to a subset using
the where argument:
-- Mark all nodes as noop
p |> mutate_node($noop = true)
-- Mark only R nodes as noop (useful for skipping heavy computations)
p |> mutate_node($noop = true, where = $runtime == "R")
-- Override serializer for all nodes
p |> mutate_node($serializer = "pmml", where = $runtime == "R")
Mutable metadata fields: noop (Bool),
runtime (String), serializer (String),
deserializer (String).
rename_nodeRenames a single node and automatically rewires all dependency edges
that referenced the old name. This is the canonical way to resolve name
collisions before set operations like union.
p = pipeline { a = 1; b = a + 1 }
p2 = p |> rename_node("a", "alpha")
pipeline_nodes(p2) -- ["alpha", "b"]
pipeline_deps(p2) -- {`alpha`: [], `b`: ["alpha"]}
Attempting to rename to a name that already exists is an error:
p |> rename_node("a", "b")
-- Error(ValueError: "A node named `b` already exists in the Pipeline.")
arrange_nodeReturns a new pipeline with nodes sorted by a metadata field. This affects only display/serialization order — the DAG determines execution order.
p = pipeline { z = 1; a = 2; m = 3 }
p |> arrange_node($name) |> pipeline_nodes -- ["a", "m", "z"]
p |> arrange_node($name, "desc") |> pipeline_nodes -- ["z", "m", "a"]
-- Sort a chain by depth (shallowest first)
p = pipeline { a = 1; b = a + 1; c = b + 1 }
p |> arrange_node($depth) |> pipeline_nodes -- ["a", "b", "c"]
Pipelines can be treated as named sets of nodes. T provides four set operations that combine or subtract pipelines.
Immutability: All set operations return new Pipelines. The original pipelines are never modified.
Lazy validation: Set operations do not check DAG validity. If the result has dangling references, errors surface at
build_pipelineorpipeline_runtime.
unionMerges two pipelines, including all nodes from both. Errors
immediately on any name collision. Use rename_node to
resolve collisions first.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> union(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
If both pipelines have a node named clean:
p_etl |> union(p_model)
-- Error(ValueError: "Function `union`: name collision(s) detected: clean. Use `rename_node` to resolve.")
-- Fix: rename before merging
p_model2 = p_model |> rename_node("clean", "clean_model")
p_etl |> union(p_model2)
differenceRemoves from the first pipeline all nodes whose names appear in the second pipeline. Nodes in the second pipeline that don’t exist in the first are silently ignored.
p = pipeline { a = 1; b = 2; c = 3; d = 4 }
p_remove = pipeline { b = 0; d = 0 }
p |> difference(p_remove) |> pipeline_nodes -- ["a", "c"]
intersectRetains only nodes present by name in both pipelines, using definitions from the first pipeline.
p1 = pipeline { a = 1; b = 2; c = 3 }
p2 = pipeline { b = 99; c = 100; d = 4 }
p1 |> intersect(p2) |> pipeline_nodes -- ["b", "c"] (p1's definitions)
patchLike union, but only updates nodes that already exist in
the first pipeline — it will not add new nodes from the second pipeline.
Ideal for overriding configurations without accidentally importing stray
nodes.
p_prod = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
}
p_overrides = pipeline {
model = rn(command = <{ lm(y ~ x + z, data = load) }>, serializer = "pmml")
extra = 99 -- stray node
}
p_updated = p_prod |> patch(p_overrides)
pipeline_nodes(p_updated) -- ["load", "model"] — "extra" was not added
These operations are structurally aware of the pipeline’s dependency graph and are used to replace node implementations, reroute edges, and extract subgraphs.
swapReplaces a node’s implementation while preserving its existing dependency edges. The new node is specified as the third argument.
p = pipeline {
data = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = data) }>, serializer = "pmml")
score = node(command = predict(model, data), deserializer = "pmml")
}
-- Replace the model node with a new implementation; edges to/from model are preserved
new_model = rn(command = <{ glm(y ~ x, data = data, family = binomial) }>, serializer = "pmml")
p2 = p |> swap("model", new_model)
pipeline_deps(p2)
-- `model` still depends on `data`, and `score` still depends on `model`
rewireReroutes a node’s declared dependencies. The replace
argument maps old dependency names to new ones. Only the named node’s
dependency list is updated.
p = pipeline {
data = read_csv("data.csv")
data_v2 = read_csv("data_v2.csv")
model = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
-- Re-point model to use data_v2 instead of data
p2 = p |> rewire("model", replace = list(data = "data_v2"))
pipeline_deps(p2)
-- {`data`: [], `data_v2`: [], `model`: ["data_v2"]}
pruneRemoves all leaf nodes — nodes that nothing else depends on. This is
useful for cleaning up intermediate pipelines after
filter_node or difference operations that may
leave orphaned utility nodes.
p = pipeline { a = 1; b = a + 1; c = 3 }
-- `a` is depended on by `b`, so it is not a leaf.
-- `b` depends on `a` but nothing depends on `b` — it is a leaf.
-- `c` is independent and nothing depends on it — it is also a leaf.
p |> prune |> pipeline_nodes -- ["a"] (both b and c are leaves, removed)
You can chain difference and prune to strip
unwanted branches in one step:
p_partial = p |> difference(p_debug_nodes) |> prune
upstream_ofReturns a new pipeline containing the named node and all its transitive ancestors (everything the node depends on, directly or indirectly).
p = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
model = rn(command = <{ lm(y ~ x, clean) }>, serializer = "pmml")
report = summary(model)
sidebar = "metadata"
}
-- Everything needed to produce `model`
p |> upstream_of("model") |> pipeline_nodes -- ["raw", "clean", "model"]
-- sidebar is excluded because model doesn't depend on it
downstream_ofReturns a new pipeline containing the named node and all nodes that transitively depend on it (everything that uses this node, directly or indirectly).
-- Everything that is affected if `clean` changes
p |> downstream_of("clean") |> pipeline_nodes -- ["clean", "model", "report"]
-- raw and sidebar are excluded
subgraphReturns the full connected component of a node — the union of its ancestors and descendants.
p = pipeline { a = 1; b = a + 1; c = b + 1; d = 99 }
-- Everything connected to b (upstream and downstream)
p |> subgraph("b") |> pipeline_nodes -- ["a", "b", "c"] — d is disconnected
These higher-level operators combine two complete, separately-defined pipelines into one.
chainConnects two pipelines where the second pipeline’s nodes reference
node names from the first as dependencies. T verifies that at least one
such shared reference exists; if the two pipelines are completely
disconnected, chain raises an error.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
-- p_model references `clean` from p_etl — this is the wire
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> chain(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
chain is stricter than union: it requires
an intent to connect the pipelines, catching accidental merges
where no wiring was meant.
T’s dependency tracking works differently depending on the node’s
runtime. This leads to a specific limitation when using
chain() with R or Python pipelines.
lm()) and a T variable from a different
pipeline.To avoid polluting your build environment with R/Python functions as Nix dependencies, T ignores external references inside RawCode blocks when they are not defined in the current pipeline block.
This means chain() will fail to automatically
wire R/Python nodes to nodes in other pipelines.
If you need an R or Python node to depend on a node from a separate
pipeline via chain(), you must “bring” that dependency into
the pipeline block using a T-expression stub with an aliased
name.
❌ Broken: R node cannot “see” raw_data for
chaining
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
model = rn(<{
lm(mpg ~ hp, data = raw_data)
}>)
}
-- Error: "no shared dependency names found"
p_full = p_data |> chain(p_model)
❌ Also broken: self-referential stub
p_model = pipeline {
raw_data = raw_data -- Error: "Self-referential node detected"
model = rn(<{ lm(mpg ~ hp, data = raw_data) }>)
}
✅ Fixed: Use a T-stub with an aliased name
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
-- Aliased T-stub: different name on the left, raw_data on the right.
-- T can parse the RHS and see `raw_data` as an external dependency.
data_input = raw_data
model = rn(<{
lm(mpg ~ hp, data = data_input) -- use the alias name in R
}>,
deserializer = "arrow")
}
-- Success! T sees `raw_data` as a dependency of `data_input`, wiring the pipelines.
p_full = p_data |> chain(p_model)
By giving the stub a different name
(data_input = raw_data), you avoid a self-reference while
still creating a T-expression that references raw_data. T
can parse the right-hand side, detect the cross-pipeline dependency, and
allow chain() to wire the pipelines together. Note that
R/Python code inside the chained node should use the alias
name (data_input) as the variable, not the
original (raw_data).
Combines two pipelines that are intended to run independently. No dependency wiring is performed. Errors on name collision.
p_r_model = pipeline {
r_fit = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
p_py_model = pipeline {
py_fit = pyn(
command = <{
from sklearn.linear_model import LinearRegression
LinearRegression().fit(X, y)
}>,
serializer = "pmml"
)
}
-- Both models will run independently
p_both = parallel(p_r_model, p_py_model)
pipeline_nodes(p_both) -- ["r_fit", "py_fit"]
Beyond pipeline_nodes and pipeline_deps, T
provides a complete structural inspection surface for pipelines.
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_roots(p) -- ["a"] — nodes with no dependencies
pipeline_leaves(p) -- ["c"] — nodes nothing depends on
pipeline_edges returns a list of [from, to]
pairs representing every edge in the DAG:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_edges(p) -- [["a", "b"], ["b", "c"]]
This is useful for serializing the graph structure or feeding it to external tools.
pipeline_depth returns the maximum topological depth
across all nodes (root nodes have depth 0):
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_depth(p) -- 2
pipeline_cycles returns any node names involved in
dependency cycles. A correctly formed pipeline always returns an empty
list:
p = pipeline { a = 1; b = a + 1 }
pipeline_cycles(p) -- []
pipeline_printPrints a human-readable summary of all nodes to stdout, including their runtime, depth, noop status, and dependency list:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
pipeline_print(p)
-- Pipeline (3 nodes):
-- a runtime=T depth=0 noop=false deps=[]
-- b runtime=R depth=0 noop=false deps=[]
-- c runtime=T depth=1 noop=false deps=[b]
pipeline_dotExports the pipeline as a Graphviz DOT string for visualization:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
dot = pipeline_dot(p)
print(dot)
-- digraph pipeline {
-- rankdir=LR;
-- node [shape=box];
-- "a" [label="a\n[T]"];
-- "b" [label="b\n[T]"];
-- "c" [label="c\n[T]"];
-- "a" -> "b";
-- "b" -> "c";
-- }
Pipe the output to dot -Tpng or paste it into
https://dreampuf.github.io/GraphvizOnline/ to render a visual dependency
graph.
By design, T uses lazy validation: structural errors
(missing dependencies, cycles) surface at build_pipeline or
pipeline_run time, not at operation time. This allows you
to compose and transform pipelines freely.
When you want to validate eagerly, T provides opt-in validation utilities.
pipeline_validateReturns a list of validation error messages. An empty list means the pipeline is structurally valid. This function never throws — it reports problems as data.
p_good = pipeline { a = 1; b = a + 1 }
pipeline_validate(p_good) -- []
-- Build a broken pipeline manually via difference
p_broken = pipeline { a = 1; b = a + 1 } |> filter_node($name == "b")
-- b now depends on a, but a was filtered out
pipeline_validate(p_broken)
-- ["Node `b` depends on `a` which does not exist in the pipeline."]
Checks performed: 1. All referenced dependencies exist as nodes in the pipeline. 2. No dependency cycles.
pipeline_assertLike pipeline_validate, but throws the
first error found instead of returning a list. Returns the pipeline
unchanged if valid. This is useful as a guard at a pipeline’s
construction site.
p = pipeline { a = 1; b = a + 1 }
|> filter_node($depth == 0) -- keeps a only
|> pipeline_assert -- succeeds, returns the pipeline
-- Chaining validation into a construction expression:
safe_pipeline = pipeline { a = 1; b = a + 1 }
|> mutate_node($noop = true, where = $runtime == "R")
|> pipeline_assert
If validation fails:
p_broken |> pipeline_assert
-- Error(ValueError: "Node `b` depends on `a` which does not exist in the pipeline.")
raw_data, filtered_sales,
summary_statspipeline_nodes(), pipeline_deps(), and
pipeline_to_frame() to understand pipeline structurepipeline_assert at the end of a construction chain to catch
structural errors earlychain over
union: When two pipelines are intentionally
connected, chain makes the dependency explicit; use
union only when combining truly independent pipelinesfilter_node + upstream_of for
partial builds: Trim a large pipeline to just what you need
before calling build_pipelinerename_node before set
ops: Both union and chain enforce
unique names; rename conflicting nodes before merging-- A full data analysis pipeline
p = pipeline {
-- Load data
raw = read_csv("employees.csv")
-- Filter to active engineers
engineers = raw
|> filter($dept == "eng")
|> filter($active == true)
-- Compute statistics
avg_salary = engineers.salary |> mean
salary_sd = engineers.salary |> sd
team_size = engineers |> nrow
-- Sort by performance
ranked = engineers |> arrange("score", "desc")
}
-- Access results
p.team_size -- number of active engineers
p.avg_salary -- mean salary
p.ranked -- DataFrame sorted by score
Now that you’ve mastered pipelines, learn how to manage reproducible projects and develop T packages: