← Back to Home


Pipeline Tutorial

A step-by-step guide to T’s pipeline execution model

Pipelines are T’s core execution model. They let you define named computation steps (nodes) that are automatically ordered by their dependencies, executed deterministically, and cached for re-use.


1. Your First Pipeline

A pipeline is a block of named expressions enclosed in pipeline { ... }:

p = pipeline {
  x = 10
  y = 20
  total = x + y
}

This creates a pipeline with three nodes: x, y, and total. Each node is computed once, and the results are cached. Access any node’s value with dot notation:

p.x      -- 10
p.y      -- 20
p.total  -- 30

The pipeline itself displays as:

Pipeline(3 nodes: [x, y, total])

2. Automatic Dependency Resolution

Nodes can be declared in any order. T automatically resolves dependencies:

p = pipeline {
  result = x + y   -- depends on x and y
  x = 3            -- defined after result
  y = 7            -- defined after result
}
p.result  -- 10

T builds a dependency graph and executes nodes in topological order, so x and y are computed before result regardless of declaration order.


3. Chained Dependencies

Nodes can depend on other computed nodes, forming chains:

p = pipeline {
  a = 1
  b = a + 1     -- depends on a
  c = b + 1     -- depends on b
  d = c + 1     -- depends on c
}
p.d  -- 4

4. Pipelines with Functions

Nodes can use any T function, including standard library functions:

p = pipeline {
  data = [1, 2, 3, 4, 5]
  total = sum(data)
  count = length(data)
}
p.total  -- 15
p.count  -- 5

5. Pipelines with Pipe Operator

The pipe operator |> works naturally inside pipelines:

double = \(x) x * 2

p = pipeline {
  a = 5
  b = a |> double
}
p.b  -- 10

6. Data Pipelines

Pipelines are most powerful for data analysis workflows. Here’s a complete example loading, transforming, and summarizing data:

p = pipeline {
  data = read_csv("employees.csv")
  rows = data |> nrow
  cols = data |> ncol
  names = data |> colnames
}

p.rows   -- number of rows
p.cols   -- number of columns
p.names  -- list of column names

Full Data Analysis Pipeline

p = pipeline {
  raw = read_csv("sales.csv")
  filtered = filter(raw, \(row) row.amount > 100)
  by_region = filtered |> group_by("region")
  summary = by_region |> summarize("total", \(g) sum(g.amount))
}

p.summary  -- DataFrame with regional totals

7. Pipeline Introspection

T provides functions to inspect pipeline structure:

List all nodes

p = pipeline { x = 10; y = 20; total = x + y }
pipeline_nodes(p)  -- ["x", "y", "total"]

View dependency graph

pipeline_deps(p)
-- {`x`: [], `y`: [], `total`: ["x", "y"]}

Access a specific node by name

pipeline_node(p, "total")  -- 30

8. Re-running Pipelines

Use pipeline_run() to re-execute a pipeline:

p2 = pipeline_run(p)
p2.total  -- 30 (re-computed)

Re-running produces the same results — T pipelines are deterministic.


9. Deterministic Execution

Two pipelines with the same definitions always produce the same results:

p1 = pipeline { a = 5; b = a * 2; c = b + 1 }
p2 = pipeline { a = 5; b = a * 2; c = b + 1 }
p1.c == p2.c  -- true

10. Error Handling

Cycle Detection

T detects circular dependencies and reports them:

pipeline {
  a = b
  b = a
}
-- Error(ValueError: "Pipeline has a dependency cycle involving node 'a'")

Error Propagation

If a node fails, the error is captured and reported:

pipeline {
  a = 1 / 0
  b = a + 1
}
-- Error(ValueError: "Pipeline node 'a' failed: ...")

Missing Nodes

Accessing a non-existent node returns a structured error:

p = pipeline { x = 10 }
p.nonexistent
-- Error(KeyError: "node 'nonexistent' not found in Pipeline")

11. Using explain() with Pipelines

The explain() function provides structured metadata about pipelines:

p = pipeline {
  x = 10
  y = x + 5
  z = y * 2
}

e = explain(p)
e.kind        -- "pipeline"
e.node_count  -- 3

Best Practices

  1. Name nodes descriptively: Use names like raw_data, filtered_sales, summary_stats
  2. Keep nodes focused: Each node should do one thing
  3. Use pipes within nodes: Combine pipeline structure with pipe operator for readability
  4. Inspect before consuming: Use pipeline_nodes() and pipeline_deps() to understand pipeline structure
  5. Build incrementally: Start with data loading, add transformations one node at a time

Complete Example

-- A full data analysis pipeline
p = pipeline {
  -- Load data
  raw = read_csv("employees.csv")
  
  -- Filter to active engineers
  engineers = raw
    |> filter(\(row) row.dept == "eng")
    |> filter(\(row) row.active == true)
  
  -- Compute statistics
  avg_salary = engineers.salary |> mean
  salary_sd = engineers.salary |> sd
  team_size = engineers |> nrow
  
  -- Sort by performance
  ranked = engineers |> arrange("score", "desc")
}

-- Access results
p.team_size     -- number of active engineers
p.avg_salary    -- mean salary
p.ranked        -- DataFrame sorted by score

← Back to Home

View Source on GitHub