Dynamic branching, node manipulation, pipeline composition, and beyond
This guide covers advanced pipeline features building on the fundamentals from the Pipeline Tutorial. You should be familiar with basic pipeline concepts (nodes, dependencies, building, and inspecting) before diving in here.
When a pipeline is built with build_pipeline(), each
node runs inside a Nix sandbox — an isolated build
environment. Import statements from your script are
automatically propagated into each sandbox, so imported
packages and functions are available to all nodes.
-- pipeline.t
import my_stats
import data_utils[read_clean, normalize]
p = pipeline {
raw = read_csv("data.csv")
clean = read_clean(raw) -- uses imported function
normed = normalize(clean) -- uses imported function
result = weighted_mean(normed.$x, normed.$w) -- uses imported function
}
build_pipeline(p)
When build_pipeline(p) generates the Nix derivation for
each node, it prepends the import statements:
-- Generated node_script.t (inside Nix sandbox)
import my_stats
import data_utils[read_clean, normalize]
raw = deserialize("$T_NODE_raw/artifact.tobj")
result = weighted_mean(raw.$x, raw.$w)
serialize(result, "$out/artifact.tobj")
All three import forms are supported:
| Syntax | Effect |
|---|---|
import "src/helpers.t" |
Import a local file |
import my_stats |
Import all public functions from a package |
import my_stats[foo, bar] |
Import specific functions |
import my_stats[wm=weighted_mean] |
Import with aliases |
You can explicitly skip a node (and by extension, all nodes that
depend on it) by passing the noop = true argument to the
node() function.
p = pipeline {
raw_data = read_csv("raw.csv")
# This node and its dependencies won't trigger a heavy Nix build
expensive_model = rn(
command = train(raw_data),
noop = true
)
# This node depends on expensive_model, therefore it becomes a noop as well
report = rn(command = generate_report(expensive_model))
}
populate_pipeline(p, build = true)
In a Nix sandbox context, noop generates a lightweight
stub instead of a real build derivation.
Every node in a pipeline carries structured metadata that you can
query and manipulate. The pipeline_to_frame() function
converts this metadata into a DataFrame with one row per node.
pipeline_to_framep = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_to_frame(p)
-- DataFrame(3 rows x 8 cols: [name, runtime, serializer, deserializer, noop, deps, depth, command_type])
The columns returned are:
| Column | Type | Description |
|---|---|---|
name |
String | Unique node identifier |
runtime |
String | "T", "R", "Python", or
"Julia" |
serializer |
String | e.g. "default", "pmml" |
deserializer |
String | e.g. "default", "pmml" |
noop |
Bool | Whether the node is a no-op |
deps |
String | Comma-separated dependency names |
depth |
Int | Topological depth (roots = 0) |
command_type |
String | "command" or "script" |
pipeline_to_frame is the foundation for inspection: you
can use T’s standard filter, select, and
arrange verbs on the resulting DataFrame.
select_nodeselect_node returns a DataFrame with only the columns
you request, using NSE $field references:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
p |> select_node($name, $runtime, $depth)
-- DataFrame: name="a", runtime="T", depth=0
-- name="b", runtime="R", depth=0
-- name="c", runtime="T", depth=1
Available fields: $name, $runtime,
$serializer, $deserializer,
$noop, $deps, $depth,
$command_type.
Pipeline nodes can pass environment variables into the Nix build
sandbox via the env_vars named argument on
node(), py()/pyn(), and
rn(). This allows nodes to configure their build-time
execution environment without embedding those values directly into the
command body.
p = pipeline {
model = rn(
command = <{ train_model(data) }>,
env_vars = [
MODEL_MODE: "train",
RETRIES: 2,
DEBUG: true
]
)
}
The env_vars dictionary supports the following
scalar-like values:
| Type | Example | Nix Output |
|---|---|---|
| String | "train" |
"train" |
| Symbol | train |
"train" |
| Int | 2 |
"2" |
| Float | 3.14 |
"3.14" (up to 15 significant digits) |
| Bool | true |
"true" |
| NA | NA |
(Omitted from derivation) |
T performs early validation on environment variables: -
env_vars must be a dictionary. - Unsupported types (like
Lists or nested Dicts) trigger a structured type error during pipeline
construction. - NA values are silently omitted from the
generated Nix derivation instead of being materialized as empty
strings.
These variables are automatically threaded into the generated
stdenv.mkDerivation and are available via standard system
methods (e.g., Sys.getenv() in R or os.environ
in Python) during the Nix build step.
_node family)T provides a set of colcraft-style verbs for operating on pipeline
nodes. These mirror the DataFrame API, using NSE $field
references for node metadata fields.
filter_nodeReturns a new pipeline containing only the nodes where the predicate
is true. No DAG validity check is performed — if a retained node
references a removed node, that surfaces at build_pipeline
time.
p = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
score = node(command = predict(model, load), deserializer = "pmml")
}
-- Keep only R nodes
p |> filter_node($runtime == "R") |> pipeline_nodes
-- ["model"]
-- Keep only nodes with no noop flag
p |> filter_node($noop == false) |> pipeline_nodes
-- Keep only shallow nodes (root and depth-1 nodes)
p |> filter_node($depth <= 1) |> pipeline_nodes
which_nodesfilter_node rewrites the pipeline itself.
which_nodes is the read-only counterpart: it filters the
richer node records you would otherwise have to access manually through
read_pipeline(p).nodes.
This is especially useful for diagnostics queries because each record
includes name, value, and
diagnostics.
p = pipeline {
bad = 1 / 0
ok = 42
downstream = bad + 1
}
-- Keep only nodes with captured errors
which_nodes(p, !is_na(diagnostics.error))
-- Same idea, but return only the node names
which_nodes(p, !is_na(diagnostics.error))
|> map(\(node) node.name)
-- ["bad", "downstream"]
-- Explicit predicate functions still work too
has_error = \(node) !is_na(node.diagnostics.error)
which_nodes(p, has_error)
-- Convenience shortcut for the most common case
errored_nodes(p) |> map(\(node) node.name)
mutate_nodeModifies metadata fields on all nodes, or scoped to a subset using
the where argument:
-- Mark all nodes as noop
p |> mutate_node($noop = true)
-- Mark only R nodes as noop (useful for skipping heavy computations)
p |> mutate_node($noop = true, where = $runtime == "R")
-- Override serializer for all nodes
p |> mutate_node($serializer = "pmml", where = $runtime == "R")
Mutable metadata fields: noop (Bool),
runtime (String), serializer (String),
deserializer (String).
rename_nodeRenames a single node and automatically rewires all dependency edges
that referenced the old name. This is the canonical way to resolve name
collisions before set operations like union.
p = pipeline { a = 1; b = a + 1 }
p2 = p |> rename_node("a", "alpha")
pipeline_nodes(p2) -- ["alpha", "b"]
pipeline_deps(p2) -- {`alpha`: [], `b`: ["alpha"]}
Attempting to rename to a name that already exists is an error:
p |> rename_node("a", "b")
-- Error(ValueError: "A node named `b` already exists in the Pipeline.")
arrange_nodeReturns a new pipeline with nodes sorted by a metadata field. This affects only display/serialization order — the DAG determines execution order.
Beyond basic execution, T allows you to treat a Pipeline as a queryable and mutable data structure. This is powerful for meta-programming, automated reporting, and “surgical” updates to large analysis graphs.
In a production setting, you may want to extract the errors from a failed pipeline run to log them or send an alert.
p = build_pipeline(p)
-- Get detailed records for all failed nodes
failed_records = errored_nodes(p)
-- Extract just the names and error messages
errors = map(failed_records, \(n) [name: n.name, msg: n.diagnostics.error])
If you have a massive pipeline but only want to visualize or re-run a
specific subset (e.g., all Python nodes), use
filter_node():
-- Create a subgraph of only Python-based computations
py_pipeline = p |> filter_node($runtime == "Python")
-- Create a subgraph of 'shallow' nodes (roots and their immediate children)
shallow_p = p |> filter_node($depth <= 1)
Lenses allow you to modify a pipeline specification without using the
pipeline { ... } block again. This is useful for “what-if”
analysis or dynamic configuration.
-- 1. Identify a node to skip
noop_l = node_meta_lens("heavy_computation", "noop")
-- 2. Toggle the noop flag surgically
p_fast = p |> set(noop_l, true)
-- 3. Swap a runtime for testing
p_test = p |> set(node_meta_lens("model_train", "runtime"), "R")
If you have a VPipeline object (from
read_pipeline()), you can use lenses to safely extract
values from specific nodes.
p_info = read_pipeline(p)
-- Focus on the 'summary' node's value
summary_l = node_lens("summary")
summary_df = get(p_info, summary_l)
p = pipeline { z = 1; a = 2; m = 3 }
p |> arrange_node($name) |> pipeline_nodes -- ["a", "m", "z"]
p |> arrange_node($name, "desc") |> pipeline_nodes -- ["z", "m", "a"]
-- Sort a chain by depth (shallowest first)
p = pipeline { a = 1; b = a + 1; c = b + 1 }
p |> arrange_node($depth) |> pipeline_nodes -- ["a", "b", "c"]
Pipelines can be treated as named sets of nodes. T provides four set operations that combine or subtract pipelines.
Immutability: All set operations return new Pipelines. The original pipelines are never modified.
Lazy validation: Set operations do not check DAG validity. If the result has dangling references, errors surface at
build_pipelineorpipeline_runtime.
unionMerges two pipelines, including all nodes from both. Errors
immediately on any name collision. Use rename_node to
resolve collisions first.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> union(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
If both pipelines have a node named clean:
p_etl |> union(p_model)
-- Error(ValueError: "Function `union`: name collision(s) detected: clean. Use `rename_node` to resolve.")
-- Fix: rename before merging
p_model2 = p_model |> rename_node("clean", "clean_model")
p_etl |> union(p_model2)
Nodes that produce large numbers of non-terminal warnings (like those
from filter() or complex modeling functions) can be
silenced using the suppress_warnings combinator. This
silences the console output for a node while maintaining the warning
records for auditability.
p = pipeline {
-- High-noise node with suppressed warnings
filtered = to_dataframe([[x: 1], [x: NA], [x: 3]])
|> filter($x > 1)
|> suppress_warnings
-- Downstream node remains unaffected
count = nrow(filtered)
}
When building or running a pipeline with suppressed nodes, the summary reflects this state:
Pipeline summary: 1 node(s) with warnings, 1 suppressed, 0 error(s)
○ filtered — warnings suppressed by caller (1 NAs ignored)
The ○ symbol indicates a suppressed node. You can still
access the underlying warning objects programmatically via
warning_msg() or read_pipeline().
warning_msg(p.filtered) -- Returns the warning message string
read_pipeline(p).diagnostics.summary -- Summary counts
differenceRemoves from the first pipeline all nodes whose names appear in the second pipeline. Nodes in the second pipeline that don’t exist in the first are silently ignored.
p = pipeline { a = 1; b = 2; c = 3; d = 4 }
p_remove = pipeline { b = 0; d = 0 }
p |> difference(p_remove) |> pipeline_nodes -- ["a", "c"]
intersectRetains only nodes present by name in both pipelines, using definitions from the first pipeline.
p1 = pipeline { a = 1; b = 2; c = 3 }
p2 = pipeline { b = 99; c = 100; d = 4 }
p1 |> intersect(p2) |> pipeline_nodes -- ["b", "c"] (p1's definitions)
patchLike union, but only updates nodes that already exist in
the first pipeline — it will not add new nodes from the second pipeline.
Ideal for overriding configurations without accidentally importing stray
nodes.
p_prod = pipeline {
load = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
}
p_overrides = pipeline {
model = rn(command = <{ lm(y ~ x + z, data = load) }>, serializer = "pmml")
extra = 99 -- stray node
}
p_updated = p_prod |> patch(p_overrides)
pipeline_nodes(p_updated) -- ["load", "model"] — "extra" was not added
These operations are structurally aware of the pipeline’s dependency graph and are used to replace node implementations, reroute edges, and extract subgraphs.
swapReplaces a node’s implementation while preserving its existing dependency edges. The new node is specified as the third argument.
p = pipeline {
data = read_csv("data.csv")
model = rn(command = <{ lm(y ~ x, data = data) }>, serializer = "pmml")
score = node(command = predict(model, data), deserializer = "pmml")
}
-- Replace the model node with a new implementation; edges to/from model are preserved
new_model = rn(command = <{ glm(y ~ x, data = data, family = binomial) }>, serializer = "pmml")
p2 = p |> swap("model", new_model)
pipeline_deps(p2)
-- `model` still depends on `data`, and `score` still depends on `model`
rewireReroutes a node’s declared dependencies. The replace
argument maps old dependency names to new ones. Only the named node’s
dependency list is updated.
p = pipeline {
data = read_csv("data.csv")
data_v2 = read_csv("data_v2.csv")
model = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
-- Re-point model to use data_v2 instead of data
p2 = p |> rewire("model", replace = list(data = "data_v2"))
pipeline_deps(p2)
-- {`data`: [], `data_v2`: [], `model`: ["data_v2"]}
pruneRemoves all leaf nodes — nodes that nothing else depends on. This is
useful for cleaning up intermediate pipelines after
filter_node or difference operations that may
leave orphaned utility nodes.
p = pipeline { a = 1; b = a + 1; c = 3 }
-- `a` is depended on by `b`, so it is not a leaf.
-- `b` depends on `a` but nothing depends on `b` — it is a leaf.
-- `c` is independent and nothing depends on it — it is also a leaf.
p |> prune |> pipeline_nodes -- ["a"] (both b and c are leaves, removed)
You can chain difference and prune to strip
unwanted branches in one step:
p_partial = p |> difference(p_debug_nodes) |> prune
upstream_ofReturns a new pipeline containing the named node and all its transitive ancestors (everything the node depends on, directly or indirectly).
p = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
model = rn(command = <{ lm(y ~ x, clean) }>, serializer = "pmml")
report = summary(model)
sidebar = "metadata"
}
-- Everything needed to produce `model`
p |> upstream_of("model") |> pipeline_nodes -- ["raw", "clean", "model"]
-- sidebar is excluded because model doesn't depend on it
downstream_ofReturns a new pipeline containing the named node and all nodes that transitively depend on it (everything that uses this node, directly or indirectly).
-- Everything that is affected if `clean` changes
p |> downstream_of("clean") |> pipeline_nodes -- ["clean", "model", "report"]
-- raw and sidebar are excluded
subgraphReturns the full connected component of a node — the union of its ancestors and descendants.
p = pipeline { a = 1; b = a + 1; c = b + 1; d = 99 }
-- Everything connected to b (upstream and downstream)
p |> subgraph("b") |> pipeline_nodes -- ["a", "b", "c"] — d is disconnected
These higher-level operators combine two complete, separately-defined pipelines into one.
chainConnects two pipelines where the second pipeline’s nodes reference
node names from the first as dependencies. T verifies that at least one
such shared reference exists; if the two pipelines are completely
disconnected, chain raises an error.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
-- p_model references `clean` from p_etl — this is the wire
p_model = pipeline {
fit = lm(clean, formula = y ~ x)
report = summary(fit)
}
p_full = p_etl |> chain(p_model)
pipeline_nodes(p_full) -- ["raw", "clean", "fit", "report"]
chain is stricter than union: it requires
an intent to connect the pipelines, catching accidental merges
where no wiring was meant.
pipeline_of)For larger projects, you can compose multiple pipelines into a
higher-order DAG using the pipeline_of block. T-Lang
natively understands and automatically flattens meta-pipelines at
execution time, meaning you can pass them directly to built-in commands
like populate_pipeline(), read_node(),
inspect_node(), or inspect_pipeline().
pipeline_of blockDefines a group of sub-pipelines. The nodes within the block bind identifiers to pipeline values.
p_etl = pipeline {
raw = read_csv("data.csv")
clean = raw |> filter($value > 0)
}
p_stats = pipeline {
summary = etl.clean |> mean
}
-- Compose them into a higher-order DAG
meta = pipeline_of {
etl = p_etl
stats = p_stats
}
T-Lang automatically analyzes cross-pipeline references in node
expressions (such as referencing etl.clean in the
stats pipeline) to infer the execution order between
sub-pipelines. The flattening engine automatically wires the root nodes
of a dependent sub-pipeline to depend on the terminal nodes of the
pipeline it references.
When a meta-pipeline is populated, queried, or inspected, T-Lang
automatically flattens it internally. Node names are automatically
namespaced (e.g. etl.raw, etl.clean,
stats.summary) to prevent namespace collisions, and all
internal variable references are rewritten accordingly.
pipeline_nodes(meta)
-- ["etl.raw", "etl.clean", "stats.summary"]
pipeline_deps(meta)
-- {`etl.raw`: [], `etl.clean`: ["etl.raw"], `stats.summary`: ["etl.clean"]}
-- You can build the entire meta-pipeline directly:
populate_pipeline(meta, build = true)
-- You can read individual nodes using nested dot notation:
res = read_node(meta.stats.summary)
T’s dependency tracking works differently depending on the node’s
runtime. This leads to a specific limitation when using
chain() with R or Python pipelines.
lm()) and a T variable from a different
pipeline.To avoid polluting your build environment with R/Python functions as Nix dependencies, T ignores external references inside RawCode blocks when they are not defined in the current pipeline block.
This means chain() will fail to automatically
wire R/Python nodes to nodes in other pipelines.
If you need an R or Python node to depend on a node from a separate
pipeline via chain(), you must “bring” that dependency into
the pipeline block using a T-expression stub with an aliased
name.
❌ Broken: R node cannot “see” raw_data for
chaining
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
model = rn(<{
lm(mpg ~ hp, data = raw_data)
}>)
}
-- Error: "no shared dependency names found"
p_full = p_data |> chain(p_model)
❌ Also broken: self-referential stub
p_model = pipeline {
raw_data = raw_data -- Error: "Self-referential node detected"
model = rn(<{ lm(mpg ~ hp, data = raw_data) }>)
}
✅ Fixed: Use a T-stub with an aliased name
p_data = pipeline { raw_data = read_csv("data.csv") }
p_model = pipeline {
-- Aliased T-stub: different name on the left, raw_data on the right.
-- T can parse the RHS and see `raw_data` as an external dependency.
data_input = raw_data
model = rn(<{
lm(mpg ~ hp, data = data_input) -- use the alias name in R
}>,
deserializer = "arrow")
}
-- Success! T sees `raw_data` as a dependency of `data_input`, wiring the pipelines.
p_full = p_data |> chain(p_model)
By giving the stub a different name
(data_input = raw_data), you avoid a self-reference while
still creating a T-expression that references raw_data. T
can parse the right-hand side, detect the cross-pipeline dependency, and
allow chain() to wire the pipelines together. Note that
R/Python code inside the chained node should use the alias
name (data_input) as the variable, not the
original (raw_data).
Rather than introducing new complex constructs, T-Lang encourages parameterizing pipelines using standard lambdas. Since lambdas return values and pipelines are first-class values in T-Lang, you can define a lambda that takes configuration parameters and returns a pipeline.
Here is a template lambda that takes a multiplier parameter and returns a pipeline with two nodes:
make_pipeline = \(multiplier: Int -> Pipeline) pipeline {
raw = [1, 2, 3]
computed = raw * multiplier
}
p1 = make_pipeline(10)
p2 = make_pipeline(20)
At execution time, outer variables (like multiplier) are
substituted with their concrete values (like 10 or
20) during compilation, resulting in fully independent
Nix-reproducible pipelines.
Combines two pipelines that are intended to run independently. No dependency wiring is performed. Errors on name collision.
p_r_model = pipeline {
r_fit = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}
p_py_model = pipeline {
py_fit = pyn(
command = <{
from sklearn.linear_model import LinearRegression
LinearRegression().fit(X, y)
}>,
serializer = "pmml"
)
}
-- Both models will run independently
p_both = parallel(p_r_model, p_py_model)
pipeline_nodes(p_both) -- ["r_fit", "py_fit"]
Beyond pipeline_nodes and pipeline_deps, T
provides a complete structural inspection surface for pipelines.
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_roots(p) -- ["a"] — nodes with no dependencies
pipeline_leaves(p) -- ["c"] — nodes nothing depends on
pipeline_edges returns a list of [from, to]
pairs representing every edge in the DAG:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_edges(p) -- [["a", "b"], ["b", "c"]]
This is useful for serializing the graph structure or feeding it to external tools.
pipeline_depth returns the maximum topological depth
across all nodes (root nodes have depth 0):
p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_depth(p) -- 2
pipeline_cycles returns any node names involved in
dependency cycles. A correctly formed pipeline always returns an empty
list:
p = pipeline { a = 1; b = a + 1 }
pipeline_cycles(p) -- []
pipeline_printPrints a human-readable summary of all nodes to stdout, including their runtime, depth, noop status, and dependency list:
p = pipeline {
a = 1
b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
c = b + 1
}
pipeline_print(p)
-- Pipeline (3 nodes):
-- a runtime=T depth=0 noop=false deps=[]
-- b runtime=R depth=0 noop=false deps=[]
-- c runtime=T depth=1 noop=false deps=[b]
pipeline_to_dotExports the pipeline as a Graphviz DOT string for visualization.
Works for both Pipeline and MetaPipeline:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
dot = pipeline_to_dot(p)
print(dot)
-- digraph pipeline {
-- rankdir=LR;
-- node [shape=box];
-- "a" [label="a\n[T]"];
-- "b" [label="b\n[T]"];
-- "c" [label="c\n[T]"];
-- "a" -> "b";
-- "b" -> "c";
-- }
Pipe the output to dot -Tpng or paste it into
https://dreampuf.github.io/GraphvizOnline/ to render a visual dependency
graph.
pipeline_to_mermaidExports the pipeline as a Mermaid flowchart string:
p = pipeline { a = 1; b = a + 1; c = b + 1 }
mermaid = pipeline_to_mermaid(p)
print(mermaid)
-- graph LR
-- a["a [T]"];
-- b["b [T]"];
-- c["c [T]"];
-- a --> b;
-- b --> c;
Render the Mermaid flowchart directly in markdown files or preview using the online Mermaid live editor.
show_plotRather than manually pasting the Mermaid string into an external
editor, you can reuse show_plot() to visualize Mermaid
graphs, pipelines, or meta-pipelines directly in your web browser:
-- Visualize a pipeline directly:
show_plot(p)
-- Or visualize a raw Mermaid string:
show_plot("graph TD\n Start --> Stop")
When you pass a pipeline, meta-pipeline, or a string starting with a
Mermaid keyword (like graph or flowchart) to
show_plot(), T dynamically generates a temporary HTML file
containing the Mermaid JS engine, renders the graph, and opens it using
your configured system viewer/browser.
By design, T uses lazy validation: structural errors
(missing dependencies, cycles) surface at build_pipeline or
pipeline_run time, not at operation time. This allows you
to compose and transform pipelines freely.
When you want to validate eagerly, T provides opt-in validation utilities.
pipeline_validateReturns a list of validation error messages. An empty list means the pipeline is structurally valid. This function never throws — it reports problems as data.
p_good = pipeline { a = 1; b = a + 1 }
pipeline_validate(p_good) -- []
-- Build a broken pipeline manually via difference
p_broken = pipeline { a = 1; b = a + 1 } |> filter_node($name == "b")
-- b now depends on a, but a was filtered out
pipeline_validate(p_broken)
-- ["Node `b` depends on `a` which does not exist in the pipeline."]
Checks performed: 1. All referenced dependencies exist as nodes in the pipeline. 2. No dependency cycles.
pipeline_assertLike pipeline_validate, but throws the
first error found instead of returning a list. Returns the pipeline
unchanged if valid. This is useful as a guard at a pipeline’s
construction site.
p = pipeline { a = 1; b = a + 1 }
|> filter_node($depth == 0) -- keeps a only
|> pipeline_assert -- succeeds, returns the pipeline
-- Chaining validation into a construction expression:
safe_pipeline = pipeline { a = 1; b = a + 1 }
|> mutate_node($noop = true, where = $runtime == "R")
|> pipeline_assert
If validation fails:
p_broken |> pipeline_assert
-- Error(ValueError: "Node `b` depends on `a` which does not exist in the pipeline.")
T-Lang uses a lexical analyzer to automatically detect dependencies between nodes by scanning the code for variable names that match other node names. While this is convenient, there are cases where automatic detection is insufficient or may produce false positives.
Sometimes, a node’s code may contain a word that matches another node
name but is intended to be a comment or a string, not a dependency. To
prevent these from causing unwanted dependency cycles, T automatically
strips standard comments starting with --
or # within foreign code blocks
(<{ ... }>) before analyzing the code.
p = pipeline {
data = read_csv("input.csv")
-- The analyzer will IGNORE the string 'results' because it's in a comment.
-- This prevents an accidental dependency on the 'results' node.
process = pyn(command = <{
# We will save the processed results to a file
import pandas as pd
df = data.dropna()
df
}>)
results = node(command = process |> head)
}
depsIn some runtimes, like sh (shell), T cannot always
reliably infer dependencies from the command string. Similarly, you may
want to explicitly declare a dependency that isn’t directly referenced
in the code (e.g., a file produced by another node that your script
reads via a hardcoded path).
For these cases, you can use the deps argument in node
definitions to manually declare one or more dependencies:
p = pipeline {
raw_file = shn(command = <{ curl -o data.csv https://example.com/data.csv }>)
-- This shell node reads data.csv, which is created by raw_file.
-- We use the `deps` argument to ensure raw_file executes first.
summary = shn(
command = <{ cat data.csv | wc -l }>,
deps = [raw_file],
serializer = "text"
)
}
Key Features of deps: -
First-Class Syntax: deps is an optional
argument available in node(), rn(),
pyn(), and shn(). - Bare
Identifiers: You can list direct node names as bare identifiers
(e.g., deps = [node1, node2]). - Manual
Override: It ensures the specified nodes are added to the
dependency graph even if they aren’t parsed from the command or script
body. - Strict Validation: T validates that all listed
dependencies exist within the same pipeline.
See the Pipeline Tutorial for general pipeline best practices (descriptive names, focused nodes, pipes, inspect, incremental builds, validation).
chain over
union: When two pipelines are intentionally
connected, chain makes the dependency explicit; use
union only when combining truly independent pipelinesfilter_node + upstream_of for
partial builds: Trim a large pipeline to just what you need
before calling build_pipelinerename_node before set
ops: Both union and chain enforce
unique names; rename conflicting nodes before mergingWhen nodes are executed within a Nix-managed sandbox (via
populate_pipeline(p, build = true)), they are isolated from
each other. However, T provides a built-in mechanism for nodes to access
the serialized artifacts of their dependencies.
For every dependency dep that a node has, the pipeline
runner automatically injects an environment variable named
T_NODE_<dep> into the sandbox. This variable contains
the path to the Nix store directory where that dependency’s artifact is
stored.
node_lensThe canonical way to access a sibling node’s artifact is using the
node_lens with the single-argument get()
function. This is preferred over manual environment variable lookup
because: 1. It is portable: T handles the path
resolution and deserialization automatically. 2. It is
integrated: It uses the same deserializer system as the
rest of the pipeline.
p = pipeline {
node_a = node(command = 100, serializer = "json")
-- This node retrieves node_a's value from its Nix artifact
dynamic_access = node(
command = {
-- Using get(node_lens("...")) for cross-node access
val = get(node_lens("node_a"))
val * 2
},
runtime = "T"
)
}
When dynamic_access runs inside the Nix sandbox: 1. T
sees the node_lens("node_a") and looks for the
T_NODE_node_a environment variable. 2. It locates the
artifact file within that path. 3. It detects the artifact
class (e.g., Int from JSON) and deserializes it back into a
T value.
This pattern is essential for polyglot pipelines
where data is passed between T, R, and Python nodes through files, and
for dynamic access nodes where the target of a
retrieval is determined at runtime (e.g.,
target = "A"; get(node_lens(target))).
To optimize large-scale pipelines and manage remote binary caching,
T-Lang includes native Nix orchestration features in
build_pipeline and pipeline_run. These
features map directly to native nix build mechanics,
allowing granular rebuild control, job parallelization, Cachix
integration, and dry-runs.
The functions build_pipeline() and
pipeline_run() accept an optional nix_options
dictionary containing the following keys:
| Key | Type | Description | Nix Command Mapping |
|---|---|---|---|
targets |
String/List/Vector | Specific node(s) or outputs to build (e.g.,
targets: ["model_a"]) |
-A <targets> |
force |
Bool/String/List/Vector | Rebuild nodes even if they already exist in the Nix store. Pass
true to force-rebuild all nodes, or a string/list of
specific node names. |
--check (rebuilds target) |
dry_run |
Bool | Preview build actions without executing them. Returns a structured
DataFrame of planned actions. |
--dry-run |
max_jobs |
Int | Limit parallel compilation/build jobs. | --max-jobs N |
cache |
String | A Cachix binary cache name (e.g., "rstats-on-nix") to
pull/push built artifacts. |
--option extra-substituters ... &
--option extra-trusted-public-keys ... |
builders |
String | Remote builder specification in SSH syntax. | --builders ... |
keep_env |
String/List/Vector | Environment variable names to pass into the Nix sandbox. | --option keep-env ... |
sandbox |
Bool/String | Sandboxing policy: true/"strict",
"relaxed", or false/"none". |
--option sandbox ... |
dry_run for
Build PreviewsIf you set dry_run: true inside
nix_options, T-Lang will invoke Nix in dry-run mode and
return a structured DataFrame detailing the exact actions
Nix plans to take (e.g., fetching from binary caches, building
derivations):
p = pipeline {
a = 1
b = a + 1
}
-- Inspect planned build actions without running them
actions = build_pipeline(p, nix_options = [dry_run: true])
print(actions)
The resulting DataFrame contains the columns: -
node: The name of the pipeline node. - action:
The action planned (e.g., "build",
"substitute", or "noop"). - path:
The absolute store path of the Nix derivation or artifact.
Below is an example showing how to trigger a parallel, cache-backed build targeting a specific node:
p = pipeline {
a = 1
b = a + 1
c = b * 2
}
-- Rebuild only node 'c', with parallel execution, using a Cachix binary cache
build_pipeline(p,
nix_options = [
targets: ["c"],
max_jobs: 4,
cache: "rstats-on-nix",
force: ["c"]
])
For teams working on large projects, T supports exporting
Nix-materialized pipeline cache artifacts into portable archive files
(.nar format). These archives can be transferred between
machines, imported without rebuilding, or inspected without
installing.
To export cached artifacts, use export_artifacts(). In
addition to entire pipelines, you can target specific
sub-structures:
p = pipeline {
a = shn(command = "echo -n 'hello'", capture = "stdout")
b = a |> \(x) x + " world"
}
build_pipeline(p)
-- 1. Export the entire pipeline's artifacts
export_artifacts(p, "full_cache.nar")
-- 2. Granular export: Export a single computed node
export_artifacts(p.a, "node_a.nar")
-- 3. Export a list or vector of nodes/pipelines
export_artifacts([p.a, p.b], "subset.nar")
-- 4. Export nested structures/dictionaries
export_artifacts([first: p.a, second: p.b], "dict_subset.nar")
To restore exported artifacts, use import_artifacts().
It is variadic and supports two calling conventions:
-- Convention 1: Import and verify against a pipeline
import_artifacts(p, "full_cache.nar")
-- Convention 2: Load archive directly into the Nix store
import_artifacts("full_cache.nar")
You can inspect the contents of an artifact archive file without
unpacking it permanently or changing your local store. The
inspect_artifacts() function imports the archive into a
temporary, isolated Nix store, extracts metadata for each path, and
returns a DataFrame.
df = inspect_artifacts("full_cache.nar")
-- View the details of the archive
df
-- DataFrame with columns:
-- - node: The name of the node (if known)
-- - store_path: The Nix store path of the artifact
-- - hash: The SHA-256 hash of the store path
-- - size_bytes: The size of the unpacked artifact in bytes
-- - references: Comma-separated basenames of dependency store paths
For convenience, you can perform a dry-run check directly using the
dry_run = true parameter in
populate_pipeline(). This reports which nodes are already
in the Nix cache and which ones require rebuilding or downloading:
p = pipeline {
a = 1
b = a + 1
}
-- Check cache hit/miss status directly
plan = populate_pipeline(p, dry_run = true)
print(plan)
-- Returns a DataFrame with columns: node, action, and path.
-- "action" will be one of:
-- - "cached": path is already built/cached locally
-- - "build": path must be rebuilt locally
-- - "fetch": path can be retrieved from remote binary substitutes
Over time, your local Nix store can accumulate unused derivations and cache files. T-Lang provides REPL functions to safely clean up OCaml/Nix artifacts directly:
pipeline_gc(p, dry_run = false):
Deletes the store paths of the given pipeline p. By default
(dry_run = true), it queries what would be deleted and
returns a DataFrame showing the node,
store_path, and deleted status. Set
dry_run = false to perform the actual deletion.t_gc(): Performs a global Nix store
garbage collection (nix-store --gc), removing all unused
derivations and freeing up disk space.p = pipeline {
a = 1
}
-- Preview what would be deleted
plan = pipeline_gc(p, dry_run = true)
-- Perform the deletion of the pipeline's nodes
pipeline_gc(p, dry_run = false)
-- Perform global garbage collection
t_gc()
T can generate a complete GitHub Actions workflow YAML for executing
a pipeline via pipeline_to_ga(). The generated
workflow:
t-runs branch
(via nix-store --import)nix develop --command t run <pipeline_script>t-runs
branch-- Write the generated YAML directly to .github/workflows/<name>.yml (uses "src/pipeline.t" by default)
pipeline_to_ga()
-- Write directly to a custom path (e.g. .github/workflows/ci.yml)
pipeline_to_ga("src/run.t", file = ".github/workflows/ci.yml")
-- Get the generated YAML back as a string instead of writing to disk
yaml = pipeline_to_ga(file = "")
print(yaml)
| Parameter | Type | Default | Description |
|---|---|---|---|
pipeline_script |
String |
"src/pipeline.t" |
Path to the pipeline T script. Can be passed as the first positional argument. |
name |
String |
Auto-detected | Project name from tproject.toml. Controls the workflow
display name, job ID, and NAR archive filename. |
file |
String |
".github/workflows/<name>.yml" |
Output file path. Defaults to
.github/workflows/<name>.yml. Set to an empty string
("") to return the YAML workflow as a string without
writing to disk. |
The auto-detected project name comes from the name field
in your project’s tproject.toml. If neither a
name argument nor a tproject.toml is found, an
error is raised prompting you to provide an explicit name.
T lets you dynamically expand a single pipeline node into multiple branches using pattern functions. This is useful when you need to run the same computation over each element of a list, vector, or data frame.
Patterns are automatically expanded when you call
populate_pipeline() or build_pipeline() — you
do not need to call expand_pipeline() explicitly. The
explicit function is available if you want to inspect the expanded
structure before building.
map_pattern — One Branch Per ElementUse map_pattern(dep) to create one branch for each
element of an upstream dependency:
p = pipeline {
x = [10, 20, 30]
y = node(command = <{ x * 2 }>, pattern = map_pattern(x))
}
-- Auto-expansion happens inside build_pipeline:
build_pipeline(p)
-- Or inspect the expanded structure explicitly:
expanded = expand_pipeline(p)
pipeline_nodes(expanded)
-- ["x", "y_branch_1", "y_branch_2", "y_branch_3"]
expanded.y_branch_1 -- 20 (10 * 2)
expanded.y_branch_2 -- 40 (20 * 2)
expanded.y_branch_3 -- 60 (30 * 2)
Multiple dependencies can be mapped simultaneously — all must have
the same length, and branch i receives element
i from each:
p = pipeline {
xs = [1, 2, 3]
ys = [10, 20, 30]
z = node(command = <{ xs + ys }>, pattern = map_pattern(xs, ys))
}
-- build_pipeline(p) auto-expands before building
cross_pattern
— Cartesian ProductUse cross_pattern(sub1, sub2, ...) for a Cartesian
product of multiple map_pattern sub-patterns:
p = pipeline {
a = [1, 2]
b = [10, 20]
c = node(command = <{ a + b }>, pattern = cross_pattern(map_pattern(a), map_pattern(b)))
}
expanded = expand_pipeline(p)
pipeline_nodes(expanded)
-- ["a", "b", "c_branch_1", "c_branch_2", "c_branch_3", "c_branch_4"]
-- Branch order: (a=1,b=10), (a=1,b=20), (a=2,b=10), (a=2,b=20)
When a dependency is a DataFrame, each row becomes one branch element:
df = to_dataframe([[x: 10], [x: 20], [x: 30]])
p = pipeline {
data = df
result = node(command = <{ data }>, pattern = map_pattern(data))
}
expanded = expand_pipeline(p)
pipeline_nodes(expanded)
-- ["data", "result_branch_1", "result_branch_2", "result_branch_3"]
-- Each branch receives a 1-row DataFrame
For finer-grained control over which elements produce branches, use selector patterns. All four take exactly one dependency and an integer parameter, and produce N branches where N is determined by the parameter.
slice_pattern(dep, [i, j, ...]) — Branch on Specific
IndicesSelect specific indices (0-based) from the dependency. Each index in the list becomes one branch. This is useful when you want to recompute only a subset of values, or when you want to reorder branches.
p = pipeline {
x = [10, 20, 30, 40, 50]
-- Only branches for indices 0, 2, 4:
y = node(command = <{ x }>, pattern = slice_pattern(x, [0, 2, 4]))
}
-- expand_pipeline(p) produces:
-- y_branch_1 with x = 10 (index 0)
-- y_branch_2 with x = 30 (index 2)
-- y_branch_3 with x = 50 (index 4)
Indices must be within the dependency’s bounds (0 ≤ i < length). Out-of-range indices return an error at expansion time.
head_pattern(dep, n) — Branch on First N ElementsTake the first n elements of the dependency. Each of the
first N elements becomes one branch. If n exceeds the
dependency length, it is silently capped — you get at most
length(dep) branches.
p = pipeline {
x = [10, 20, 30, 40, 50]
-- First two elements:
y = node(command = <{ x }>, pattern = head_pattern(x, 2))
-- First ten (capped at 5):
z = node(command = <{ x }>, pattern = head_pattern(x, 10))
}
-- y produces 2 branches: y_branch_1 (x=10), y_branch_2 (x=20)
-- z produces 5 branches (one per element, since 10 > 5)
tail_pattern(dep, n) — Branch on Last N ElementsTake the last n elements of the dependency. Branches are
indexed from the end — element at length - n becomes
branch_1, and so on. Like head_pattern, n is
capped to the dependency length if it exceeds it.
p = pipeline {
x = [10, 20, 30, 40, 50]
-- Last two elements:
y = node(command = <{ x }>, pattern = tail_pattern(x, 2))
}
-- y produces 2 branches:
-- y_branch_1 with x = 40 (index 3)
-- y_branch_2 with x = 50 (index 4)
sample_pattern(dep, n) — Randomly Select N ElementsRandomly select n elements from the dependency (without
replacement — no duplicate branches). Uses a Fisher-Yates partial
shuffle seeded deterministically from the node name
(Hashtbl.hash name), so repeated expansions of the same
pipeline always produce the same random draw. Different node names
produce different draws. As with the other selectors, n is capped to the
dependency length.
p = pipeline {
x = [10, 20, 30, 40, 50]
-- Randomly pick 2 elements:
y = node(command = <{ x }>, pattern = sample_pattern(x, 2))
}
-- y produces 2 branches with randomly chosen values from x.
-- The selection is deterministic: calling expand_pipeline(p) again on
-- the same pipeline always picks the same two indices.
| Pattern | Branch count | Branches from |
|---|---|---|
slice_pattern(dep, [i, j, ...]) |
len(indices) |
Values at given indices |
head_pattern(dep, n) |
min(n, length(dep)) |
First n elements |
tail_pattern(dep, n) |
min(n, length(dep)) |
Last n elements |
sample_pattern(dep, n) |
min(n, length(dep)) |
Random n elements |
All four patterns are automatically expanded on
build_pipeline, populate_pipeline, and
composition builtins (chain, parallel,
union, etc.), just like map_pattern and
cross_pattern. They cannot be nested inside
cross_pattern() — only map_pattern is
supported as a sub-pattern of cross_pattern.
Pattern branching works with non-T runtimes (R,
Python, Julia, etc.), but requires explicit
serializer and deserializer configuration so
cross-runtime data interchange works correctly. Each branch runs under
the same runtime as the original patterned node:
p = pipeline {
a = [1, 2, 3]
b = node(
command = <{ a }>,
runtime = R,
serializer = ^json,
deserializer = ^json,
pattern = map_pattern(a)
)
}
build_pipeline(p)
-- Each branch (b_branch_1, b_branch_2, b_branch_3) runs in R
The serializer/deserializer symbols (^json in the
example) must match a supported interchange format on both sides of the
runtime boundary. If serializer and deserializer are omitted, expansion
succeeds but the build will fail — the default serializer cannot produce
runtime-specific artifacts for cross-runtime data interchange.
See §11.8 for a complete polyglot example using
cross_pattern and map_pattern with R
ggplot2.
Pass to_script to write the expanded pipeline as a T
source file for inspection or debugging:
expand_pipeline(p, to_script = "expanded_pipeline.t")
The output file contains the full pipeline { ... }
definition with all branches unrolled.
populate_pipeline(), build_pipeline(),
chain(), parallel(), union(),
intersect(), difference(), and
patch() all automatically expand any unexpanded patterns in
their pipeline inputs before proceeding. You only need to call
expand_pipeline() explicitly when you want to inspect the
branch structure before building.
Even before calling expand_pipeline(), you can inspect
and interact with the branch structure of a patterned pipeline
directly:
List branch names with
pipeline_nodes(p):
p = pipeline {
a = [10, 20, 30]
b = map_pattern(a) ~> a * 2
}
pipeline_nodes(p)
-- Result: ["a", "b_branch_1", "b_branch_2", "b_branch_3"]
Inspect branch structure with
inspect_pipeline(p):
inspect_pipeline(p)
-- DataFrame with one row per branch, including auto-generated branch names
Access a branch by name with dot notation —
p.b_branch_1 lazily synthesizes a
VComputedNode without triggering expansion:
b1 = p.b_branch_1
-- b1 is a computed node that will be resolved when built
Helpful error on the parent node: if you try
read_node(p.b) on a patterned node before building, instead
of a generic “not built yet” error you get a message listing the
available branches:
Error(ValueError): Node `b` has a pattern and expands into b_branch_1, b_branch_2, b_branch_3.
Use read_node(p.<branch_name>) to access individual branches directly.
Reserved naming: node names ending in
_branch_N (e.g. x_branch_1) are reserved for
auto-generated branch nodes. Using such a name at pipeline construction
produces a NameError.
This demo (from t_demos/dynamic_branching_t) combines
cross_pattern, map_pattern, and cross-runtime
(T ↔︎ R) serialization into a single end-to-end pipeline. It generates
spirograph data points in T and plots them with R ggplot2 —
one plot per parameter combination.
You have a list of radii [3, 5, 8] and
[2, 4, 6]. You want all 9 combinations of spirograph curves
drawn as ggplot2 faceted plots. Writing 9 nodes by hand is tedious — use
pattern branching instead.
p = pipeline {
fixed_radii = [3, 5, 8]
cycling_radii = [2, 4, 6]
points = node(
command = <{
import "src/spirograph.t"
spirograph_points(fixed_radii, cycling_radii)
}>,
pattern = cross_pattern(map_pattern(fixed_radii), map_pattern(cycling_radii)),
runtime = T,
serializer = ^json
)
single_plot = node(
command = <{ plot_spirographs(points) }>,
pattern = map_pattern(points),
functions = ["src/spirograph.R"],
runtime = R,
deserializer = ^json
)
}
cross_pattern(map_pattern(fixed_radii), map_pattern(cycling_radii))
— takes the Cartesian product of both lists (3 × 3 = 9). Each branch
calls spirograph_points(r_fixed, r_cycling) from
src/spirograph.t with one specific radius pair, returning a
DataFrame of x, y coordinates.
serializer = ^json — the
points node uses the ^json symbol serializer
to write each branch’s DataFrame as a JSON array of records. Without
this, the R node downstream cannot read the data.
map_pattern(points) — creates one
branch per points output (9 branches total). Each branch
calls plot_spirographs() from
src/spirograph.R.
deserializer = ^json — Tells the
pipeline runner to read the R node’s JSON artifact back so it can be
cached and inspected.
^json symbol syntax — required for
serializer/deserializer values in cross-runtime pipelines (string
literals like "json" are not accepted). This is the
canonical way to declare interchange formats.
src/spirograph.t — a parametric
spirograph function called by each data branch:
spirograph_points = \(fixed_radius, cycling_radius) {
num_points = 10000
max_t = 30 * pi
t_values = float_seq(0, max_t, num_points)
diff = fixed_radius - cycling_radius
ratio = diff / cycling_radius
xs = t_values |> map(\(t) diff * cos(t) + cos(t * ratio))
ys = t_values |> map(\(t) diff * sin(t) - sin(t * ratio))
to_dataframe([x: xs, y: ys,
fixed_radius: fixed_radius,
cycling_radius: cycling_radius])
}
src/spirograph.R — renders a faceted
ggplot for one parameter combination:
library(ggplot2)
plot_spirographs <- function(points) {
label <- "fixed_radius = %s, cycling_radius = %s"
points$parameters <- sprintf(label, points$fixed_radius, points$cycling_radius)
ggplot(points) +
geom_point(aes(x = x, y = y, color = parameters), size = 0.1) +
facet_wrap(~parameters) +
theme_gray(16) +
guides(color = "none")
}Note the functions = ["src/spirograph.R"] argument on
the R node — this propagates the script into the Nix sandbox so it is
available at build time.
When you run build_pipeline(p, verbose = 1), T
automatically expands the patterns before building:
+ points_branch_1 building
+ points_branch_2 building
...
+ points_branch_9 building
+ single_plot_branch_3 building
+ single_plot_branch_5 building
...
+ single_plot_branch_1 building
✓ Pipeline build completed [20 built / 20 nodes]
20 nodes total: 2 root (fixed_radii,
cycling_radii), 9 data branches, 9 plot branches.
Since build_pipeline does not modify the original
pipeline variable p, expanded branch nodes are only visible
in the build log. Use build_log_to_frame() to inspect
them:
res = build_pipeline(p, verbose = 1)
node_frame = build_log_to_frame(res)
points_branches = filter(node_frame, \(r) starts_with(r.name, "points"))
plot_branches = filter(node_frame, \(r) starts_with(r.name, "single_plot"))
assert(nrow(points_branches) == 9, "Expected 9 points branches")
assert(nrow(plot_branches) == 9, "Expected 9 single_plot branches")
assert(length(res.failed_nodes) == 0, "All nodes should succeed")
This pattern — build_log_to_frame(res) +
filter with a lambda — is the recommended way to verify
branched pipeline builds when you need to inspect individual expanded
nodes.
cross_pattern(map_pattern(...), map_pattern(...))
generates a Cartesian product of branches.cross_pattern →
map_pattern) lets you build multi-phase branched
pipelines.^json serializer/deserializer enables
cross-runtime data interchange between T and R nodes.build_log_to_frame()
for post-build queries.map_pattern node can depend on T data branches and run in R
(or Python or Julia), as long as serialization is configured
correctly.T supports conditional node inclusion evaluated at pipeline
construction time, preserving Nix’s static DAG requirement. There are
two functions: node_when and node_fork.
node_when(condition, value)Returns value if condition is truthy,
otherwise returns a null marker that causes the pipeline to exclude the
node entirely. The condition is evaluated before the build.
p = pipeline {
dev_data = read_csv("data/dev.csv")
model = node_when(env("CI") == "1", pyn(script = "train.py"))
deployed = node_when(env("BRANCH") == "main", pyn(script = "deploy.py"))
}
build_pipeline(p)
If CI is not "1", the model
node is excluded and no attempt is made to resolve its dependencies. If
BRANCH is not "main", the
deployed node is similarly excluded.
node_fork(...condition_value_pairs, .default = ...)A multi-way branch: takes condition-value pairs and returns the value
for the first truthy condition. If no condition matches and no
.default is provided, the node is excluded.
p = pipeline {
data = read_csv("data.csv")
model = node_fork(
env("MODEL") == "linear", lm(mpg ~ wt, data),
env("MODEL") == "forest", pyn(script = "rf.py"),
env("MODEL") == "neural", pyn(script = "nn.py"),
.default = lm(mpg ~ wt, data)
)
}
Here, setting MODEL=forest in the environment selects
the random forest node; any other value falls back to the
.default linear model.
node_when and node_fork are only
meaningful as the direct value of a node binding inside a
pipeline { } block. Using the result outside that context
(arithmetic, is_na(), etc.) is unsupported.env() to read environment variables.is_na(). It
exists only to signal node exclusion to the pipeline machinery.Now that you’ve mastered pipelines, learn how to manage reproducible projects and develop T packages: