Pipeline Tutorial

A step-by-step guide to T’s pipeline execution model

Pipelines are T’s core execution model. They let you define named computation steps (nodes) that are automatically ordered by their dependencies, executed deterministically, and cached for re-use.


1. Your First Pipeline

A pipeline is a block of named expressions enclosed in pipeline { ... }:

p = pipeline {
  x = 10
  y = 20
  total = x + y
}

This creates a pipeline with three nodes: x, y, and total. Each node is computed once, and the results are cached. Access any node’s value with dot notation:

p.x      -- 10
p.y      -- 20
p.total  -- 30

The pipeline itself displays as:

Pipeline(3 nodes: [x, y, total])

2. Explicit Node Configuration

In addition to bare assignments, you can explicitly configure nodes using the node() function. This lets you define the execution environment (like the runtime) and custom serialization methods for when a pipeline is materialized by Nix:

p = pipeline {
  data = node(command = read_csv("data.csv"), runtime = T)
  
  -- Running a Python node that trains a model using the pyn wrapper
  model = pyn(
    command = <{
        from sklearn.linear_model import LinearRegression
        fit = LinearRegression().fit(X, y)
        fit
    }>,
    serializer = "pmml"
  )
}

Bare syntax (like x = 10) is automatically desugared to x = node(command = 10, runtime = T, serializer = default, deserializer = default). You can also use pyn(), rn(), and shn() as shortcuts for Python, R, and shell runtimes. T enforces cross-runtime safety: if a node with a non-T runtime depends on a T node, or vice versa, you should specify an explicit serializer/deserializer.

Using the script Argument

Instead of inlining code with command, you can point a node to an external source file using the script argument. This works with node(), pyn(), rn(), and shn(). The script and command arguments are mutually exclusive.

p = pipeline {
  -- Execute an external R script
  model = rn(script = "train_model.R", serializer = "pmml")

  -- Execute an external Python script
  predictions = pyn(script = "predict.py", deserializer = "pmml")

  -- Execute an external shell script
  report = shn(script = "postprocess.sh")

  -- node() auto-detects the runtime from the file extension
  summary = node(script = "summarise.R", serializer = "json")
}

When using script, the runtime is auto-detected from the file extension (.R → R, .py → Python, .sh → sh) if not explicitly set via the runtime argument. T reads the script file to extract identifier references, allowing the pipeline dependency graph to be built correctly from variables referenced in the external file.

Shell / Bash nodes with shn()

Use shn() for pipeline steps that are easiest to express as shell or CLI commands. It is a convenience wrapper around node(runtime = sh, ...), just like rn() and pyn() wrap node() for R and Python.

p = pipeline {
  -- Exec-style shell node: command + positional argv
  fields = shn(
    command = "printf",
    args = ["first line\\nsecond line\\n"]
  )

  -- Script-style shell node: inline shell source executed with `sh`
  report = shn(command = <{
#!/bin/sh
set -eu

# Dependencies for T's lexical pipeline analysis: summary_r summary_py
printf 'R summary: %s\n' "$T_NODE_summary_r/artifact"
printf 'Python summary: %s\n' "$T_NODE_summary_py/artifact"
  }>)
}

There are two useful modes:

Shell nodes default to serializer = text, which makes them a good fit for reports, command output, and glue code between other pipeline nodes. For a full end-to-end example that mixes T, R, Python, and sh, see tests/pipeline/polyglot_shell_pipeline.t and .github/workflows/polyglot-shell-pipeline.yml.


3. Cross-Language Integration

T is designed to orchestrate code across multiple languages. The pipeline runner manages the serialization and deserialization of data between R, Python, and T using industry-standard formats.

Supported Interchange Formats

Format Option Best For Requirement
T Native "default" T-to-T communication None
Arrow "arrow" Large DataFrames pyarrow (Py), arrow (R)
PMML "pmml" Predictive Models sklearn2pmml (Py), r2pmml (R)
JSON "json" Simple lists/dicts jsonlite (R)

Example: Training in R, Predicting in T

You can train a model in R and use T’s native OCaml model evaluator to make predictions without leaving the T runtime:

p = pipeline {
  -- Node 1: Train model in R using the rn wrapper
  model_r = rn(
    command = <{
      data <- read.csv("data.csv")
      lm(mpg ~ wt + hp, data = data)
    }>,
    serializer = "pmml"
  )
  
  -- Node 2: Predict in T using the R model
  predictions = node(
    command = <{
      test_df = read_csv("new_data.csv")
      predict(test_df, model_r)
    }>,
    runtime = "T",
    deserializer = "pmml"
  )
}

Setting deserializer = "pmml" on the T node tells the pipeline runner to use T’s native PMML parser to convert the R model into a T model object.


4. Automatic Dependency Resolution

Nodes can be declared in any order. T automatically resolves dependencies:

p = pipeline {
  result = x + y   -- depends on x and y
  x = 3            -- defined after result
  y = 7            -- defined after result
}
p.result  -- 10

T builds a dependency graph and executes nodes in topological order, so x and y are computed before result regardless of declaration order.


5. Chained Dependencies

Nodes can depend on other computed nodes, forming chains:

p = pipeline {
  a = 1
  b = a + 1     -- depends on a
  c = b + 1     -- depends on b
  d = c + 1     -- depends on c
}
p.d  -- 4

6. Pipelines with Functions

Nodes can use any T function, including standard library functions:

p = pipeline {
  data = [1, 2, 3, 4, 5]
  total = sum(data)
  count = length(data)
}
p.total  -- 15
p.count  -- 5

7. Pipelines with Pipe Operators

The pipe operator |> works naturally inside pipelines:

double = \(x) x * 2

p = pipeline {
  a = 5
  b = a |> double
}
p.b  -- 10

Error Recovery with Maybe-Pipe

The maybe-pipe ?|> forwards all values — including errors — to the next function. This is useful for building recovery logic into pipelines:

recovery = \(x) if (is_error(x)) 0 else x
double = \(x) x * 2

p = pipeline {
  raw = 1 / 0                    -- Error: division by zero
  safe = raw ?|> recovery        -- forwards error to recovery → 0
  result = safe |> double        -- 0 |> double → 0
}
p.safe    -- 0
p.result  -- 0

Without ?|>, the error from raw would short-circuit at |> and never reach recovery. The maybe-pipe lets you intercept errors and provide fallback values.


8. Data Pipelines

Pipelines are most powerful for data analysis workflows. Here’s a complete example loading, transforming, and summarizing data:

p = pipeline {
  data = read_csv("employees.csv")
  rows = data |> nrow
  cols = data |> ncol
  names = data |> colnames
}

p.rows   -- number of rows
p.cols   -- number of columns
p.names  -- list of column names

Full Data Analysis Pipeline

p = pipeline {
  raw = read_csv("sales.csv")
  filtered = filter(raw, $amount > 100)
  by_region = filtered |> group_by($region)
  summary = by_region |> summarize($total = sum($amount))
}

p.summary  -- DataFrame with regional totals

9. Pipeline Introspection

T provides functions to inspect pipeline structure:

List all nodes

p = pipeline { x = 10; y = 20; total = x + y }
pipeline_nodes(p)  -- ["x", "y", "total"]

View dependency graph

pipeline_deps(p)
-- {`x`: [], `y`: [], `total`: ["x", "y"]}

Access a specific node by name

pipeline_node(p, "total")  -- 30

10. Re-running Pipelines

Use pipeline_run() to re-execute a pipeline:

p2 = pipeline_run(p)
p2.total  -- 30 (re-computed)

Re-running produces the same results — T pipelines are deterministic.


11. Deterministic Execution

Two pipelines with the same definitions always produce the same results:

p1 = pipeline { a = 5; b = a * 2; c = b + 1 }
p2 = pipeline { a = 5; b = a * 2; c = b + 1 }
p1.c == p2.c  -- true

12. Error Handling

Cycle Detection

T detects circular dependencies and reports them:

pipeline {
  a = b
  b = a
}
-- Error(ValueError: "Pipeline has a dependency cycle involving node 'a'")

Error Propagation

If a node fails, the error is captured and reported:

pipeline {
  a = 1 / 0
  b = a + 1
}
-- Error(ValueError: "Pipeline node 'a' failed: ...")

Missing Nodes

Accessing a non-existent node returns a structured error:

p = pipeline { x = 10 }
p.nonexistent
-- Error(KeyError: "node 'nonexistent' not found in Pipeline")

13. Materializing Pipelines

Defining a pipeline with pipeline { ... } evaluates nodes in-memory. To materialize them as reproducible Nix artifacts (potentially using R or Python dependencies you’ve defined in tproject.toml), use populate_pipeline() with the build = true argument:

p = pipeline {
  data = read_csv("sales.csv")
  total = sum(data.$amount)
}

populate_pipeline(p, build = true)

populate_pipeline(p, build = true) is the primary command for materializing a pipeline. It does the following:

  1. Populates the _pipeline/ directory with pipeline.nix and dag.json.
  2. Generates a Nix expression with one derivation per node. Crucially, if you define [r-dependencies] or [py-dependencies] in your tproject.toml, pipeline nodes have access to these language environments!
  3. Triggers a Nix build to materialize each node as a serialized artifact.
  4. Records the build in a timestamped log file (_pipeline/build_log_YYYYMMdd_HHmmss_hash.json).

[!NOTE] build_pipeline(p) is available as a shorthand for populate_pipeline(p, build = true).

Reading built artifacts

After building, use read_node() or load_node() to retrieve materialized values:

read_node("total")   -- reads the serialized artifact for "total"
load_node("total")   -- same as read_node, loads the artifact

These functions look up the node in the latest build log and deserialize the artifact.


14. Orchestrating with populate_pipeline()

For more control over the build process, T provides populate_pipeline(). This function prepares the pipeline infrastructure without necessarily triggering the Nix build immediately.

populate_pipeline(p)                -- Prepares _pipeline/ only
populate_pipeline(p, build = true)  -- Same as build_pipeline(p)

The _pipeline/ directory

T maintains a persistent state directory for your pipeline. When you populate or build, T creates:


15. Build Logs and Time Travel

T keeps a history of your builds in _pipeline/. This enables Time Travel — the ability to read artifacts from specific past versions of your pipeline.

Inspecting logs

Use list_logs() to see available build logs:

logs = list_logs()
-- DataFrame of build logs with filename, modification_time, and size_kb

Use inspect_pipeline() to view the build status of a specific pipeline as a DataFrame (defaults to the latest):

inspect_pipeline()
-- DataFrame(5 rows x 4 cols: [derivation, build_success, path, output])

inspect_pipeline(which_log = "20260221_143022")

Reading from a specific build

Pass the which_log argument to read_node() to specify which build to read from. You can pass a regex pattern or a specific filename:

-- Read the latest version (default)
val = read_node("result")

-- Read from a specific historical build
val_old = read_node("result", which_log = "20260221_143022")

This ensures that even as you update your code and data, you can always recover and compare results from previous runs.


16. Execution Modes

T enforces a clear separation between interactive and non-interactive execution:

Non-interactive (t run)

Scripts executed with t run must call populate_pipeline() (or build_pipeline()). This ensures that non-interactive execution always produces reproducible Nix artifacts.

# ✅ This works — script defines and populates a pipeline
$ t run my_pipeline.t

# ❌ This is rejected — script doesn't call populate_pipeline()
$ t run my_script.t
# Error: non-interactive execution requires a pipeline.
# Scripts run with `t run` must call `populate_pipeline(p, build=true)` or `build_pipeline()`.
# Use the REPL for interactive exploration, or pass --unsafe to override.

A valid pipeline script looks like:

-- my_pipeline.t
p = pipeline {
  data = read_csv("input.csv")
  result = data |> filter($value > 0) |> summarize(total = sum($value))
}

populate_pipeline(p, build = true)

Interactive (REPL)

The REPL is unrestricted — you can run any T code line by line, whether or not it involves pipelines:

$ t repl
T> x = 1 + 2
T> print(x)
3
T> p = pipeline { a = 10 }
T> p.a
10

17. Using Imports in Pipelines

When a pipeline is built with build_pipeline(), each node runs inside a Nix sandbox — an isolated build environment. Import statements from your script are automatically propagated into each sandbox, so imported packages and functions are available to all nodes.

-- pipeline.t
import my_stats
import data_utils[read_clean, normalize]

p = pipeline {
  raw = read_csv("data.csv")
  clean = read_clean(raw)              -- uses imported function
  normed = normalize(clean)            -- uses imported function
  result = weighted_mean(normed.$x, normed.$w)  -- uses imported function
}

build_pipeline(p)

When build_pipeline(p) generates the Nix derivation for each node, it prepends the import statements:

-- Generated node_script.t (inside Nix sandbox)
import my_stats
import data_utils[read_clean, normalize]
raw = deserialize("$T_NODE_raw/artifact.tobj")
result = weighted_mean(raw.$x, raw.$w)
serialize(result, "$out/artifact.tobj")

All three import forms are supported:

Syntax Effect
import "src/helpers.t" Import a local file
import my_stats Import all public functions from a package
import my_stats[foo, bar] Import specific functions
import my_stats[wm=weighted_mean] Import with aliases

18. Using explain() with Pipelines

The explain() function provides structured metadata about pipelines:

p = pipeline {
  x = 10
  y = x + 5
  z = y * 2
}

e = explain(p)
e.kind        -- "pipeline"
e.node_count  -- 3

19. Skipping Nodes

You can explicitly skip a node (and by extension, all nodes that depend on it) by passing the noop = true argument to the node() function.

p = pipeline {
  raw_data = read_csv("raw.csv")
  
  # This node and its dependencies won't trigger a heavy Nix build
  expensive_model = rn(
    command = train(raw_data),
    noop = true
  )

  # This node depends on expensive_model, therefore it becomes a noop as well
  report = rn(command = generate_report(expensive_model))
}

populate_pipeline(p, build = true)

In a Nix sandbox context, noop generates a lightweight stub instead of a real build derivation.


20. Node Metadata

Every node in a pipeline carries structured metadata that you can query and manipulate. The pipeline_to_frame() function converts this metadata into a DataFrame with one row per node.

pipeline_to_frame

p = pipeline { a = 1; b = a + 1; c = b + 1 }
pipeline_to_frame(p)
-- DataFrame(3 rows x 8 cols: [name, runtime, serializer, deserializer, noop, deps, depth, command_type])

The columns returned are:

Column Type Description
name String Unique node identifier
runtime String "T", "R", or "Python"
serializer String e.g. "default", "pmml"
deserializer String e.g. "default", "pmml"
noop Bool Whether the node is a no-op
deps String Comma-separated dependency names
depth Int Topological depth (roots = 0)
command_type String "command" or "script"

pipeline_to_frame is the foundation for inspection: you can use T’s standard filter, select, and arrange verbs on the resulting DataFrame.

pipeline_summary

pipeline_summary(p) is a convenience alias for pipeline_to_frame(p):

pipeline_summary(p)
-- same output as pipeline_to_frame(p)

select_node

select_node returns a DataFrame with only the columns you request, using NSE $field references:

p = pipeline {
  a = 1
  b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
  c = b + 1
}

p |> select_node($name, $runtime, $depth)
-- DataFrame: name="a", runtime="T", depth=0
--            name="b", runtime="R", depth=0
--            name="c", runtime="T", depth=1

Available fields: $name, $runtime, $serializer, $deserializer, $noop, $deps, $depth, $command_type.


21. Environment Variables

Pipeline nodes can pass environment variables into the Nix build sandbox via the env_vars named argument on node(), py()/pyn(), and rn(). This allows nodes to configure their build-time execution environment without embedding those values directly into the command body.

p = pipeline {
  model = rn(
    command = <{ train_model(data) }>,
    env_vars = [
      MODEL_MODE: "train",
      RETRIES: 2,
      DEBUG: true
    ]
  )
}

Supported Types

The env_vars dictionary supports the following scalar-like values:

Type Example Nix Output
String "train" "train"
Symbol train "train"
Int 2 "2"
Float 3.14 "3.14" (up to 15 significant digits)
Bool true "true"
Null null (Omitted from derivation)

Validation

T performs early validation on environment variables: - env_vars must be a dictionary. - Unsupported types (like Lists or nested Dicts) trigger a structured type error during pipeline construction. - null values are silently omitted from the generated Nix derivation instead of being materialized as empty strings.

These variables are automatically threaded into the generated stdenv.mkDerivation and are available via standard system methods (e.g., Sys.getenv() in R or os.environ in Python) during the Nix build step.


22. Node-Level Operations (_node family)

T provides a set of colcraft-style verbs for operating on pipeline nodes. These mirror the DataFrame API, using NSE $field references for node metadata fields.

filter_node

Returns a new pipeline containing only the nodes where the predicate is true. No DAG validity check is performed — if a retained node references a removed node, that surfaces at build_pipeline time.

p = pipeline {
  load   = read_csv("data.csv")
  model  = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
  score  = node(command = predict(model, load), deserializer = "pmml")
}

-- Keep only R nodes
p |> filter_node($runtime == "R") |> pipeline_nodes
-- ["model"]

-- Keep only nodes with no noop flag
p |> filter_node($noop == false) |> pipeline_nodes

-- Keep only shallow nodes (root and depth-1 nodes)
p |> filter_node($depth <= 1) |> pipeline_nodes

mutate_node

Modifies metadata fields on all nodes, or scoped to a subset using the where argument:

-- Mark all nodes as noop
p |> mutate_node($noop = true)

-- Mark only R nodes as noop (useful for skipping heavy computations)
p |> mutate_node($noop = true, where = $runtime == "R")

-- Override serializer for all nodes
p |> mutate_node($serializer = "pmml", where = $runtime == "R")

Mutable metadata fields: noop (Bool), runtime (String), serializer (String), deserializer (String).

rename_node

Renames a single node and automatically rewires all dependency edges that referenced the old name. This is the canonical way to resolve name collisions before set operations like union.

p = pipeline { a = 1; b = a + 1 }

p2 = p |> rename_node("a", "alpha")
pipeline_nodes(p2)   -- ["alpha", "b"]
pipeline_deps(p2)    -- {`alpha`: [], `b`: ["alpha"]}

Attempting to rename to a name that already exists is an error:

p |> rename_node("a", "b")
-- Error(ValueError: "A node named `b` already exists in the Pipeline.")

arrange_node

Returns a new pipeline with nodes sorted by a metadata field. This affects only display/serialization order — the DAG determines execution order.

p = pipeline { z = 1; a = 2; m = 3 }

p |> arrange_node($name) |> pipeline_nodes       -- ["a", "m", "z"]
p |> arrange_node($name, "desc") |> pipeline_nodes -- ["z", "m", "a"]

-- Sort a chain by depth (shallowest first)
p = pipeline { a = 1; b = a + 1; c = b + 1 }
p |> arrange_node($depth) |> pipeline_nodes      -- ["a", "b", "c"]

23. Set Operations

Pipelines can be treated as named sets of nodes. T provides four set operations that combine or subtract pipelines.

Immutability: All set operations return new Pipelines. The original pipelines are never modified.

Lazy validation: Set operations do not check DAG validity. If the result has dangling references, errors surface at build_pipeline or pipeline_run time.

union

Merges two pipelines, including all nodes from both. Errors immediately on any name collision. Use rename_node to resolve collisions first.

p_etl = pipeline {
  raw   = read_csv("data.csv")
  clean = raw |> filter($value > 0)
}

p_model = pipeline {
  fit    = lm(clean, formula = y ~ x)
  report = summary(fit)
}

p_full = p_etl |> union(p_model)
pipeline_nodes(p_full)  -- ["raw", "clean", "fit", "report"]

If both pipelines have a node named clean:

p_etl |> union(p_model)
-- Error(ValueError: "Function `union`: name collision(s) detected: clean. Use `rename_node` to resolve.")

-- Fix: rename before merging
p_model2 = p_model |> rename_node("clean", "clean_model")
p_etl |> union(p_model2)

difference

Removes from the first pipeline all nodes whose names appear in the second pipeline. Nodes in the second pipeline that don’t exist in the first are silently ignored.

p = pipeline { a = 1; b = 2; c = 3; d = 4 }
p_remove = pipeline { b = 0; d = 0 }

p |> difference(p_remove) |> pipeline_nodes  -- ["a", "c"]

intersect

Retains only nodes present by name in both pipelines, using definitions from the first pipeline.

p1 = pipeline { a = 1; b = 2; c = 3 }
p2 = pipeline { b = 99; c = 100; d = 4 }

p1 |> intersect(p2) |> pipeline_nodes  -- ["b", "c"] (p1's definitions)

patch

Like union, but only updates nodes that already exist in the first pipeline — it will not add new nodes from the second pipeline. Ideal for overriding configurations without accidentally importing stray nodes.

p_prod = pipeline {
  load  = read_csv("data.csv")
  model = rn(command = <{ lm(y ~ x, data = load) }>, serializer = "pmml")
}

p_overrides = pipeline {
  model = rn(command = <{ lm(y ~ x + z, data = load) }>, serializer = "pmml")
  extra = 99  -- stray node
}

p_updated = p_prod |> patch(p_overrides)
pipeline_nodes(p_updated)  -- ["load", "model"] — "extra" was not added

24. DAG-Aware Transformations

These operations are structurally aware of the pipeline’s dependency graph and are used to replace node implementations, reroute edges, and extract subgraphs.

swap

Replaces a node’s implementation while preserving its existing dependency edges. The new node is specified as the third argument.

p = pipeline {
  data  = read_csv("data.csv")
  model = rn(command = <{ lm(y ~ x, data = data) }>, serializer = "pmml")
  score = node(command = predict(model, data), deserializer = "pmml")
}

-- Replace the model node with a new implementation; edges to/from model are preserved
new_model = rn(command = <{ glm(y ~ x, data = data, family = binomial) }>, serializer = "pmml")
p2 = p |> swap("model", new_model)

pipeline_deps(p2)
-- `model` still depends on `data`, and `score` still depends on `model`

rewire

Reroutes a node’s declared dependencies. The replace argument maps old dependency names to new ones. Only the named node’s dependency list is updated.

p = pipeline {
  data    = read_csv("data.csv")
  data_v2 = read_csv("data_v2.csv")
  model   = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}

-- Re-point model to use data_v2 instead of data
p2 = p |> rewire("model", replace = list(data = "data_v2"))
pipeline_deps(p2)
-- {`data`: [], `data_v2`: [], `model`: ["data_v2"]}

prune

Removes all leaf nodes — nodes that nothing else depends on. This is useful for cleaning up intermediate pipelines after filter_node or difference operations that may leave orphaned utility nodes.

p = pipeline { a = 1; b = a + 1; c = 3 }
-- `a` is depended on by `b`, so it is not a leaf.
-- `b` depends on `a` but nothing depends on `b` — it is a leaf.
-- `c` is independent and nothing depends on it — it is also a leaf.

p |> prune |> pipeline_nodes  -- ["a"] (both b and c are leaves, removed)

You can chain difference and prune to strip unwanted branches in one step:

p_partial = p |> difference(p_debug_nodes) |> prune

upstream_of

Returns a new pipeline containing the named node and all its transitive ancestors (everything the node depends on, directly or indirectly).

p = pipeline {
  raw     = read_csv("data.csv")
  clean   = raw |> filter($value > 0)
  model   = rn(command = <{ lm(y ~ x, clean) }>, serializer = "pmml")
  report  = summary(model)
  sidebar = "metadata"
}

-- Everything needed to produce `model`
p |> upstream_of("model") |> pipeline_nodes  -- ["raw", "clean", "model"]
-- sidebar is excluded because model doesn't depend on it

downstream_of

Returns a new pipeline containing the named node and all nodes that transitively depend on it (everything that uses this node, directly or indirectly).

-- Everything that is affected if `clean` changes
p |> downstream_of("clean") |> pipeline_nodes  -- ["clean", "model", "report"]
-- raw and sidebar are excluded

subgraph

Returns the full connected component of a node — the union of its ancestors and descendants.

p = pipeline { a = 1; b = a + 1; c = b + 1; d = 99 }

-- Everything connected to b (upstream and downstream)
p |> subgraph("b") |> pipeline_nodes  -- ["a", "b", "c"] — d is disconnected

25. Pipeline Composition

These higher-level operators combine two complete, separately-defined pipelines into one.

chain

Connects two pipelines where the second pipeline’s nodes reference node names from the first as dependencies. T verifies that at least one such shared reference exists; if the two pipelines are completely disconnected, chain raises an error.

p_etl = pipeline {
  raw   = read_csv("data.csv")
  clean = raw |> filter($value > 0)
}

-- p_model references `clean` from p_etl — this is the wire
p_model = pipeline {
  fit    = lm(clean, formula = y ~ x)
  report = summary(fit)
}

p_full = p_etl |> chain(p_model)
pipeline_nodes(p_full)  -- ["raw", "clean", "fit", "report"]

chain is stricter than union: it requires an intent to connect the pipelines, catching accidental merges where no wiring was meant.

Cross-Pipeline Dependency Tracking: T vs. RawCode

T’s dependency tracking works differently depending on the node’s runtime. This leads to a specific limitation when using chain() with R or Python pipelines.

How T Detects Dependencies

The Limitation

To avoid polluting your build environment with R/Python functions as Nix dependencies, T ignores external references inside RawCode blocks when they are not defined in the current pipeline block.

This means chain() will fail to automatically wire R/Python nodes to nodes in other pipelines.

The Solution: The T-Stub Workaround

If you need an R or Python node to depend on a node from a separate pipeline via chain(), you must “bring” that dependency into the pipeline block using a T-expression stub with an aliased name.

❌ Broken: R node cannot “see” raw_data for chaining

p_data = pipeline { raw_data = read_csv("data.csv") }

p_model = pipeline {
  model = rn(<{ 
    lm(mpg ~ hp, data = raw_data) 
  }>)
}

-- Error: "no shared dependency names found"
p_full = p_data |> chain(p_model)

❌ Also broken: self-referential stub

p_model = pipeline {
  raw_data = raw_data  -- Error: "Self-referential node detected"
  model = rn(<{ lm(mpg ~ hp, data = raw_data) }>)
}

✅ Fixed: Use a T-stub with an aliased name

p_data = pipeline { raw_data = read_csv("data.csv") }

p_model = pipeline {
  -- Aliased T-stub: different name on the left, raw_data on the right.
  -- T can parse the RHS and see `raw_data` as an external dependency.
  data_input = raw_data
  
  model = rn(<{ 
    lm(mpg ~ hp, data = data_input)  -- use the alias name in R
  }>,
  deserializer = "arrow")
}

-- Success! T sees `raw_data` as a dependency of `data_input`, wiring the pipelines.
p_full = p_data |> chain(p_model)

By giving the stub a different name (data_input = raw_data), you avoid a self-reference while still creating a T-expression that references raw_data. T can parse the right-hand side, detect the cross-pipeline dependency, and allow chain() to wire the pipelines together. Note that R/Python code inside the chained node should use the alias name (data_input) as the variable, not the original (raw_data).


26. Parallel Execution

Combines two pipelines that are intended to run independently. No dependency wiring is performed. Errors on name collision.

p_r_model = pipeline {
  r_fit = rn(command = <{ lm(y ~ x, data) }>, serializer = "pmml")
}

p_py_model = pipeline {
  py_fit = pyn(
    command = <{
      from sklearn.linear_model import LinearRegression
      LinearRegression().fit(X, y)
    }>,
    serializer = "pmml"
  )
}

-- Both models will run independently
p_both = parallel(p_r_model, p_py_model)
pipeline_nodes(p_both)  -- ["r_fit", "py_fit"]

27. Extended Inspection API

Beyond pipeline_nodes and pipeline_deps, T provides a complete structural inspection surface for pipelines.

Boundary Nodes

p = pipeline { a = 1; b = a + 1; c = b + 1 }

pipeline_roots(p)   -- ["a"]  — nodes with no dependencies
pipeline_leaves(p)  -- ["c"]  — nodes nothing depends on

Dependency Edges

pipeline_edges returns a list of [from, to] pairs representing every edge in the DAG:

p = pipeline { a = 1; b = a + 1; c = b + 1 }

pipeline_edges(p)  -- [["a", "b"], ["b", "c"]]

This is useful for serializing the graph structure or feeding it to external tools.

Topological Depth

pipeline_depth returns the maximum topological depth across all nodes (root nodes have depth 0):

p = pipeline { a = 1; b = a + 1; c = b + 1 }

pipeline_depth(p)  -- 2

Cycle Detection

pipeline_cycles returns any node names involved in dependency cycles. A correctly formed pipeline always returns an empty list:

p = pipeline { a = 1; b = a + 1 }
pipeline_cycles(p)  -- []

pipeline_print

Prints a human-readable summary of all nodes to stdout, including their runtime, depth, noop status, and dependency list:

p = pipeline {
  a = 1
  b = node(command = <{ 2 }>, runtime = R, serializer = "pmml")
  c = b + 1
}

pipeline_print(p)
-- Pipeline (3 nodes):
--   a                     runtime=T         depth=0  noop=false  deps=[]
--   b                     runtime=R         depth=0  noop=false  deps=[]
--   c                     runtime=T         depth=1  noop=false  deps=[b]

pipeline_dot

Exports the pipeline as a Graphviz DOT string for visualization:

p = pipeline { a = 1; b = a + 1; c = b + 1 }

dot = pipeline_dot(p)
print(dot)
-- digraph pipeline {
--   rankdir=LR;
--   node [shape=box];
--   "a" [label="a\n[T]"];
--   "b" [label="b\n[T]"];
--   "c" [label="c\n[T]"];
--   "a" -> "b";
--   "b" -> "c";
-- }

Pipe the output to dot -Tpng or paste it into https://dreampuf.github.io/GraphvizOnline/ to render a visual dependency graph.


28. Pipeline Validation

By design, T uses lazy validation: structural errors (missing dependencies, cycles) surface at build_pipeline or pipeline_run time, not at operation time. This allows you to compose and transform pipelines freely.

When you want to validate eagerly, T provides opt-in validation utilities.

pipeline_validate

Returns a list of validation error messages. An empty list means the pipeline is structurally valid. This function never throws — it reports problems as data.

p_good = pipeline { a = 1; b = a + 1 }
pipeline_validate(p_good)  -- []

-- Build a broken pipeline manually via difference
p_broken = pipeline { a = 1; b = a + 1 } |> filter_node($name == "b")
-- b now depends on a, but a was filtered out

pipeline_validate(p_broken)
-- ["Node `b` depends on `a` which does not exist in the pipeline."]

Checks performed: 1. All referenced dependencies exist as nodes in the pipeline. 2. No dependency cycles.

pipeline_assert

Like pipeline_validate, but throws the first error found instead of returning a list. Returns the pipeline unchanged if valid. This is useful as a guard at a pipeline’s construction site.

p = pipeline { a = 1; b = a + 1 }
  |> filter_node($depth == 0)    -- keeps a only
  |> pipeline_assert              -- succeeds, returns the pipeline

-- Chaining validation into a construction expression:
safe_pipeline = pipeline { a = 1; b = a + 1 }
  |> mutate_node($noop = true, where = $runtime == "R")
  |> pipeline_assert

If validation fails:

p_broken |> pipeline_assert
-- Error(ValueError: "Node `b` depends on `a` which does not exist in the pipeline.")

Best Practices

  1. Name nodes descriptively: Use names like raw_data, filtered_sales, summary_stats
  2. Keep nodes focused: Each node should do one thing
  3. Use pipes within nodes: Combine pipeline structure with pipe operator for readability
  4. Inspect before consuming: Use pipeline_nodes(), pipeline_deps(), and pipeline_to_frame() to understand pipeline structure
  5. Build incrementally: Start with data loading, add transformations one node at a time
  6. Validate at construction time: Use pipeline_assert at the end of a construction chain to catch structural errors early
  7. Compose with chain over union: When two pipelines are intentionally connected, chain makes the dependency explicit; use union only when combining truly independent pipelines
  8. Use filter_node + upstream_of for partial builds: Trim a large pipeline to just what you need before calling build_pipeline
  9. Resolve collisions with rename_node before set ops: Both union and chain enforce unique names; rename conflicting nodes before merging

Complete Example

-- A full data analysis pipeline
p = pipeline {
  -- Load data
  raw = read_csv("employees.csv")
  
  -- Filter to active engineers
  engineers = raw
    |> filter($dept == "eng")
    |> filter($active == true)
  
  -- Compute statistics
  avg_salary = engineers.salary |> mean
  salary_sd = engineers.salary |> sd
  team_size = engineers |> nrow
  
  -- Sort by performance
  ranked = engineers |> arrange("score", "desc")
}

-- Access results
p.team_size     -- number of active engineers
p.avg_salary    -- mean salary
p.ranked        -- DataFrame sorted by score

Next Steps

Now that you’ve mastered pipelines, learn how to manage reproducible projects and develop T packages:

  1. Project Development — Master T’s project structure and dependency management.
  2. Package Development — Create reusable T libraries.
  3. Reproducibility Guide — Deep dive into T’s commitment to reproducible research.
  4. API Reference — Complete function reference by package.