Pipeline Tutorial

A step-by-step guide to T’s pipeline execution model

Pipelines are T’s core execution model. They let you define named computation steps (nodes) that are automatically ordered by their dependencies, executed deterministically, and cached for re-use.


1. Your First Pipeline

A pipeline is a block of named expressions enclosed in pipeline { ... }:

p = pipeline {
  x = 10
  y = 20
  total = x + y
}

This creates a pipeline with three nodes: x, y, and total. Each node is computed once, and the results are cached. Access any node’s value with dot notation:

p.x      -- 10
p.y      -- 20
p.total  -- 30

The pipeline itself displays as:

Pipeline(3 nodes: [x, y, total])

1.1 The Interactive Development Loop

A productive way to build pipelines is incrementally in the REPL — start small, build, inspect, extend:

p = pipeline { raw_data = [1, 2, 3, 4, 5] }
build_pipeline(p)
p.raw_data     -- [1, 2, 3, 4, 5]
read_node(p.raw_data)  -- verify serialized artifact
show_plot(p)   -- visualize the DAG

Then add a node and rebuild:

p = pipeline {
  raw_data = [1, 2, 3, 4, 5]
  total = sum(raw_data)
}
build_pipeline(p)
p.total        -- 15
show_plot(p)   -- updated DAG with raw_data → total

Because raw_data was already built and cached, Nix skips rebuilding it and only computes the new total node. Run build_pipeline(p, dry_run = true) to preview cache hits vs rebuilds.

This Build → Verify → Plot → Extend loop lets you grow complex pipelines reliably, one node at a time.


2. Execution Modes

T enforces a clear separation between interactive and non-interactive execution.

Interactive (REPL)

The REPL is unrestricted — you can run any T code line by line, whether or not it involves pipelines:

$ t repl
T> x = 1 + 2
T> p = pipeline { a = 10 }
T> p.a
10

Non-interactive (t run)

Scripts executed with t run must call populate_pipeline() (or build_pipeline()). This ensures that non-interactive execution always produces reproducible Nix artifacts.

$ t run my_pipeline.t
-- my_pipeline.t
p = pipeline {
  data = read_csv("input.csv")
  result = data |> filter($value > 0) |> summarize(total = sum($value))
}
populate_pipeline(p, build = true)

By default, t run operates in resilient mode, continuing past errors. Use --failfast to stop at the first error.


3. Pipeline Function Quick Reference

A consolidated index of all pipeline reading, inspecting, and build-log functions. Use this as a cheatsheet to find the right tool for the job.

Reading Node Artifacts

Function Parameters Returns What it does
read_node(node) ComputedNode deserialized value + diagnostics Read in-scope pipeline node artifact
read_past_node(p.name, which_log) NSE-captured node, String (required) deserialized value + diagnostics Read from historical build log without pipeline in scope
read_pipeline(p) Pipeline Dict Per-node values + diagnostics + aggregated summary
pipeline_node(p, name) Pipeline, String Any Value of a specific node by name

Build Logs & History

Function Parameters Returns What it does
build_log(p, which_log?) Pipeline, optional String BuildLog Structured build log for latest (or specified) build
build_log_to_frame(log) BuildLog DataFrame Build log as DataFrame (name, status, duration, path)
build_log_history(p, n?, pattern?) Pipeline, optional Int, String DataFrame History of all builds matching pipeline’s node signature
list_logs() DataFrame All log files in _pipeline/ (filename, mtime, size, pipeline)
inspect_log(p?, which_log?) optional Pipeline, optional String DataFrame Derivation-level build status (derivation, build_success, path)
read_log(node_name) String String Raw Nix build log text for a specific node

Node Inspection & Diagnostics

Function Parameters Returns What it does
inspect_node(node) ComputedNode Dict Static metadata (runtime, path, class, deps) + structured warnings
warning_msg(node) ComputedNode String Formatted warning message (own + upstream with source prefix)
collect_exceptions(p) Pipeline DataFrame Structured error/warning DataFrame from built pipeline
suppress_warnings(val) Any original value Suppress console warnings for a node; still accessible via warning_msg()
debug_node(node) ComputedNode NA Interactive REPL subshell pre-configured with node environment
rebuild_node(node) ComputedNode ComputedNode Rebuild a single node and return updated artifact path

Pipeline DAG Structure

Function Parameters Returns What it does
pipeline_to_frame(p) Pipeline DataFrame Full node metadata (runtime, serializer, deps, depth, command_type)
pipeline_nodes(p) Pipeline List[String] All node names
pipeline_deps(p) Pipeline Dict Node name → list of dependency names
pipeline_edges(p) Pipeline List[[from, to]] Edge list as dependency pairs
pipeline_roots(p) Pipeline List[String] Nodes with no dependencies
pipeline_leaves(p) Pipeline List[String] Nodes that nothing depends on
pipeline_depth(p) Pipeline Int Maximum topological depth
pipeline_cycles(p) Pipeline List[String] Nodes involved in cycles (empty = valid)
pipeline_validate(p) Pipeline List[String] Validation errors (empty = valid); checks missing deps + cycles
pipeline_assert(p) Pipeline Pipeline Throws first error, or returns pipeline unchanged
pipeline_print(p) Pipeline NA Pretty-print node table to stdout
pipeline_to_dot(p) Pipeline | MetaPipeline String Graphviz DOT representation
pipeline_to_mermaid(p) Pipeline | MetaPipeline String Mermaid flowchart diagram
trace_nodes(p, name?) Pipeline, optional String NA Visual dependency tree printer
pipeline_cache_status(p) Pipeline DataFrame Nix store cache hits per node (cached, store_path)
pipeline_to_drv(p) Pipeline Dict Node → derivation (.drv) path mapping
pipeline_to_store(p) Pipeline Dict Node → Nix store output path mapping

Node-Level Filtering & Diffs

Function Parameters Returns What it does
select_node(p, ...) Pipeline, Symbol DataFrame Column projection from pipeline_to_frame
which_nodes(p, predicate) Pipeline, Function (NSE) List Node records from read_pipeline(p).nodes matching predicate
errored_nodes(p) Pipeline List Convenience wrapper: nodes with non-NA diagnostics.error
node_diff(a, b, log_a?, log_b?) ComputedNode ×2, optional String/Int VDict Compare node artifacts across builds
pipeline_diff(a, b) Pipeline ×2 Dict Structural diff between two pipeline DAGs

Export / Import / GC

Function Parameters Returns What it does
pipeline_copy(node?, target_dir?) optional String, String String Copy artifacts from Nix store to local directory
export_artifacts(p, archive) Pipeline, String String Export cached artifacts to portable archive
import_artifacts(target_or_archive, archive?) Pipeline or String, optional String String Import previously exported archive
inspect_artifacts(archive) String DataFrame Preview archive contents without importing
pipeline_gc(p, dry_run?) Pipeline, optional Bool DataFrame GC pipeline store paths (dry_run=true previews)
t_gc() String Global Nix garbage collection

4. Explicit Node Configuration

In addition to bare assignments, you can explicitly configure nodes using the node() function. This lets you define the execution environment (like the runtime) and custom serialization methods for when a pipeline is materialized by Nix:

p = pipeline {
  data = node(command = read_csv("data.csv"), runtime = T)
  
  -- Running a Python node that trains a model using the pyn wrapper
  model = pyn(
    command = <{
        from sklearn.linear_model import LinearRegression
        fit = LinearRegression().fit(X, y)
        fit
    }>,
    serializer = "pmml"
  )
}

Bare syntax (like x = 10) is automatically desugared to x = node(command = 10, runtime = T, serializer = default, deserializer = default). You can also use pyn(), rn(), and shn() as shortcuts for Python, R, and shell runtimes. T enforces cross-runtime safety: if a node with a non-T runtime depends on a T node, or vice versa, you should specify an explicit serializer/deserializer.

When an R node returns a ggplot2 object, a Python node returns a matplotlib / plotnine plot object, or a Julia node returns a TidierPlots.jl, Plots.jl, or Makie.jl figure object, T preserves lightweight plot metadata for REPL inspection. Reading or printing those artifacts shows a structured summary with the plot class (ggplot, matplotlib, plotnine, tidierplots, plotsjl, or makie), runtime backend (R, Python, or Julia), title, labels, mappings when available, and layer information instead of a raw runtime-specific object dump.

Using the script Argument

Instead of inlining code with command, you can point a node to an external source file using the script argument. This works with node(), pyn(), rn(), and shn(). The script and command arguments are mutually exclusive.

p = pipeline {
  -- Execute an external R script
  model = rn(script = "train_model.R", serializer = "pmml")

  -- Execute an external Python script
  predictions = pyn(script = "predict.py", deserializer = "pmml")

  -- Execute an external shell script
  report = shn(script = "postprocess.sh")

  -- node() auto-detects the runtime from the file extension
  summary = node(script = "summarise.R", serializer = "json")
}

When using script, the runtime is auto-detected from the file extension (.R → R, .py → Python, .sh → sh) if not explicitly set via the runtime argument. T reads the script file to extract identifier references, allowing the pipeline dependency graph to be built correctly from variables referenced in the external file.

Shell / Bash nodes with shn()

Use shn() for pipeline steps that are easiest to express as shell or CLI commands. It is a convenience wrapper around node(runtime = sh, ...), just like rn() and pyn() wrap node() for R and Python.

p = pipeline {
  -- Exec-style shell node: command + positional argv
  fields = shn(
    command = "printf",
    args = ["first line\\nsecond line\\n"]
  )

  -- Script-style shell node: inline shell source executed with `sh`
  report = shn(command = <{
#!/bin/sh
set -eu

# Dependencies for T's lexical pipeline analysis: summary_r summary_py
printf 'R summary: %s\n' "$T_NODE_summary_r/artifact"
printf 'Python summary: %s\n' "$T_NODE_summary_py/artifact"
  }>)
}

There are two useful modes:

Shell nodes default to serializer = text, which makes them a good fit for reports, command output, and glue code between other pipeline nodes. For a full end-to-end example that mixes T, R, Python, and sh, see tests/pipeline/polyglot_shell_pipeline.t and .github/workflows/polyglot-shell-pipeline.yml.


5. Cross-Language Integration

T is designed to orchestrate code across multiple languages. The pipeline runner manages the serialization and deserialization of data between R, Python, and T using a first-class serializer system. For a deep dive into how T handles data interchange, see the Serializers Documentation.

Interchange Formats Comparison

Format Option Best For Requirement
T Native "default" T-to-T communication None
Arrow "arrow" Large DataFrames pyarrow (Py), arrow (R)
PMML "pmml" Predictive Models sklearn2pmml (Py), r2pmml (R)
JSON "json" Simple lists/dicts jsonlite (R)

Example: Training in R, Predicting in T

You can train a model in R and use T’s native OCaml model evaluator to make predictions without leaving the T runtime:

p = pipeline {
  -- Node 1: Train model in R using the rn wrapper
  model_r = rn(
    command = <{
      data <- read.csv("data.csv")
      lm(mpg ~ wt + hp, data = data)
    }>,
    serializer = "pmml"
  )
  
  -- Node 2: Predict in T using the R model
  predictions = node(
    command = <{
      test_df = read_csv("new_data.csv")
      predict(test_df, model_r)
    }>,
    runtime = "T",
    deserializer = "pmml"
  )
}

Setting deserializer = "pmml" on the T node tells the pipeline runner to use T’s native PMML parser to convert the R model into a T model object.


6. Automatic Dependency Resolution

Nodes can be declared in any order. T automatically resolves dependencies:

p = pipeline {
  result = x + y   -- depends on x and y
  x = 3            -- defined after result
  y = 7            -- defined after result
}
p.result  -- 10

T builds a dependency graph and executes nodes in topological order, so x and y are computed before result regardless of declaration order.


7. Chained Dependencies

Nodes can depend on other computed nodes, forming chains:

p = pipeline {
  a = 1
  b = a + 1     -- depends on a
  c = b + 1     -- depends on b
  d = c + 1     -- depends on c
}
p.d  -- 4

8. Pipelines with Functions

Nodes can use any T function, including standard library functions:

p = pipeline {
  data = [1, 2, 3, 4, 5]
  total = sum(data)
  count = length(data)
}
p.total  -- 15
p.count  -- 5

9. Pipelines with Pipe Operators

The pipe operator |> works naturally inside pipelines:

double = \(x) x * 2

p = pipeline {
  a = 5
  b = a |> double
}
p.b  -- 10

Error Recovery with Maybe-Pipe

The maybe-pipe ?|> forwards all values — including errors — to the next function. This is useful for building recovery logic into pipelines:

recovery = \(x) if (is_error(x)) 0 else x
double = \(x) x * 2

p = pipeline {
  raw = 1 / 0                    -- Error: division by zero
  safe = raw ?|> recovery        -- forwards error to recovery → 0
  result = safe |> double        -- 0 |> double → 0
}
p.safe    -- 0
p.result  -- 0

Without ?|>, the error from raw would short-circuit at |> and never reach recovery. The maybe-pipe lets you intercept errors and provide fallback values.


10. Data Pipelines

Pipelines are most powerful for data analysis workflows. Here’s a complete example loading, transforming, and summarizing data:

p = pipeline {
  data = read_csv("employees.csv")
  rows = data |> nrow
  cols = data |> ncol
  names = data |> colnames
}

p.rows   -- number of rows
p.cols   -- number of columns
p.names  -- list of column names

Full Data Analysis Pipeline

p = pipeline {
  raw = read_csv("sales.csv")
  filtered = filter(raw, $amount > 100)
  by_region = filtered |> group_by($region)
  summary = by_region |> summarize($total = sum($amount))
}

p.summary  -- DataFrame with regional totals

11. Pipeline Introspection

↩︎ Quick Reference: Reading Node Artifacts

T provides functions to inspect pipeline structure:

List all nodes

p = pipeline { x = 10; y = 20; total = x + y }
pipeline_nodes(p)  -- ["x", "y", "total"]

View dependency graph

pipeline_deps(p)
-- {`x`: [], `y`: [], `total`: ["x", "y"]}

Access a specific node by name

pipeline_node(p, "total")  -- 30

Explain diagnostics

explain() provides structured metadata about any value, including pipelines and nodes:

p = pipeline { x = 10; y = x + 5; z = y * 2 }

e = explain(p)
e.kind        -- "pipeline"
e.node_count  -- 3

explain(p.x)  -- { `runtime`: "T", `kind`: "node", `name`: "x", ... }

12. Re-running Pipelines

Use pipeline_run() to re-execute a pipeline:

p2 = pipeline_run(p)
p2.total  -- 30 (re-computed)

Re-running produces the same results — T pipelines are deterministic.


13. Deterministic Execution

Two pipelines with the same definitions always produce the same results:

p1 = pipeline { a = 5; b = a * 2; c = b + 1 }
p2 = pipeline { a = 5; b = a * 2; c = b + 1 }
p1.c == p2.c  -- true

14. Error Handling & Resilience

Errors are Values

In T, errors are first-class values. By default, evaluation is resilient: if a node fails, it produces an Error value instead of crashing the pipeline. This allows other independent nodes to continue building, giving you a full picture of which parts of your DAG are healthy.

p = pipeline {
  a = 1 / 0      -- Produces an Error(DivisionByZero)
  b = 1 + 1      -- Still succeeds! (2)
  c = a + 1      -- Fails because 'a' is an error (Error)
}

When you print or build this pipeline, T provides a summary of which nodes succeeded and which failed.

The --failfast Flag

If you prefer the usual, common behaviour where evaluation stops immediately at the first error, you can use the --failfast flag:

$ t run --failfast src/pipeline.t

In your T scripts, you can also opt-in to this behavior via t_make():

t_make(failfast = true)

Cycle Detection

T detects circular dependencies and reports them at construction time, before any nodes are executed:

pipeline {
  a = b
  b = a
}
-- Error(ValueError: "Pipeline has a dependency cycle involving node 'a'")

Missing Nodes

Accessing a non-existent node returns a structured error:

p = pipeline { x = 10 }
p.nonexistent
-- Error(KeyError: "node 'nonexistent' not found in Pipeline")

15. Materializing Pipelines

↩︎ Quick Reference: Reading Node Artifacts

Defining a pipeline with pipeline { ... } evaluates nodes in-memory. To materialize them as reproducible Nix artifacts (potentially using R or Python dependencies you’ve defined in tproject.toml), use populate_pipeline() with the build = true argument:

p = pipeline {
  data = read_csv("sales.csv")
  total = sum(data.$amount)
}

populate_pipeline(p, build = true)

populate_pipeline(p, build = true) is the primary command for materializing a pipeline. It does the following:

  1. Populates the _pipeline/ directory with pipeline.nix and dag.json.
  2. Generates a Nix expression with one derivation per node. Crucially, if you define [r-dependencies] or [py-dependencies] in your tproject.toml, pipeline nodes have access to these language environments!
  3. Triggers a Nix build to materialize each node as a serialized artifact.
  4. Records the build in a timestamped log file (_pipeline/build_log_YYYYMMdd_HHmmss_hash.json).

[!NOTE] build_pipeline(p) is available as a shorthand for populate_pipeline(p, build = true).

Reading built artifacts

After building, use read_node() to retrieve materialized values:

read_node(p.total)   -- reads the serialized artifact for "total"

These functions look up the node in the latest build log and deserialize the artifact.


16. Orchestrating with populate_pipeline()

For more control over the build process, T provides populate_pipeline(). This function prepares the pipeline infrastructure without necessarily triggering the Nix build immediately.

populate_pipeline(p)                -- Prepares _pipeline/ only
populate_pipeline(p, build = true)  -- Same as build_pipeline(p)

[!TIP] For advanced configuration and passing low-level arguments directly to the underlying Nix build system (such as concurrency, targeted nodes, custom binary caches, dry runs, and force rebuilds), see the comprehensive Nix Build Options & Orchestration guide.

The _pipeline/ directory

T maintains a persistent state directory for your pipeline. When you populate or build, T creates:


Best Practices

  1. Name nodes descriptively: Use names like raw_data, filtered_sales, summary_stats
  2. Keep nodes focused: Each node should do one thing
  3. Use pipes within nodes: Combine pipeline structure with pipe operator for readability
  4. Inspect before consuming: Use pipeline_nodes(), pipeline_deps(), and pipeline_to_frame() to understand pipeline structure
  5. Build incrementally: Start with data loading, add transformations one node at a time
  6. Validate at construction time: Use pipeline_assert at the end of a construction chain to catch structural errors early

Complete Example

-- A full data analysis pipeline
p = pipeline {
  -- Load data
  raw = read_csv("employees.csv")
  
  -- Filter to active engineers
  engineers = raw
    |> filter($dept == "eng")
    |> filter($active == true)
  
  -- Compute statistics
  avg_salary = engineers.salary |> mean
  salary_sd = engineers.salary |> sd
  team_size = engineers |> nrow
  
  -- Sort by performance
  ranked = engineers |> arrange("score", "desc")
}

-- Access results
p.team_size     -- number of active engineers
p.avg_salary    -- mean salary
p.ranked        -- DataFrame sorted by score

Next Steps

Now that you’ve mastered pipeline basics, explore advanced topics:

  1. Advanced Pipeline Tutorial — Dynamic branching (pattern expansion), node manipulation, pipeline composition, DAG transformations, CI/CD, and more.
  2. Project Development — Master T’s project structure and dependency management.
  3. Package Development — Create reusable T libraries.
  4. Reproducibility Guide — Deep dive into T’s commitment to reproducible research.
  5. API Reference — Complete function reference by package.