Combine generators into a pipeline¶
The question. You have a multi-step data transformation — filter these records, reshape them, summarise the result — and you want it to stay lazy so a 10 GB input still runs in constant memory. Each stage should be testable on its own.
The answer: write each stage as a small generator function, then compose them by nesting. Nothing runs until a consumer (a for loop, sum, list, writing to a file) pulls values through.
# A pipeline of four stages: source -> filter -> transform -> aggregate.
# Each stage is a named generator; composition is nesting.
import random
from itertools import groupby
from statistics import mean
# Source: produces values. In real code this might read from a file or socket.
def read_readings(n, seed=0):
rnd = random.Random(seed)
sensors = ['s1', 's2', 's3']
for i in range(n):
yield (i, rnd.choice(sensors), rnd.gauss(10, 2))
# Filter: subset of input, same shape.
def above(readings, threshold):
for r in readings:
if r[2] > threshold:
yield r
# Transform: one value out per value in.
def bucketise(readings, size):
for ts, sensor, value in readings:
yield ((ts // size) * size, sensor, value)
# Aggregate: yields FEWER values than it consumed.
def average_per_bucket(readings):
keyed = sorted(((r[0], r[1]), r[2]) for r in readings) # groupby needs adjacency
for key, group in groupby(keyed, key=lambda p: p[0]):
yield (*key, mean(v for _, v in group))
# Wire it up by nesting. The order reads inside-out: source at the core.
pipeline = average_per_bucket(
bucketise(above(read_readings(200, seed=1), 8.0), 50)
)
# The consumer — nothing happens until you iterate.
for row in list(pipeline)[:6]:
print(row)
Variant: compose stages from a list with reduce¶
When the stages are decided at runtime (driven by user config, say), you can't hard-code the call chain. A one-line compose helper applies stages in order; functools.partial supplies extra arguments.
from functools import partial, reduce
def compose(source, *stages):
'''Apply each stage to the running iterable, in order.'''
return reduce(lambda it, stage: stage(it), stages, source)
def square(xs):
for x in xs:
yield x * x
def above_value(xs, threshold):
for x in xs:
if x > threshold:
yield x
stages = [partial(above_value, threshold=2), square]
print(list(compose(range(6), *stages)))
Variant: tap a stream for debugging¶
A tap stage yields each value through unchanged but also prints (or logs) it. Insert anywhere in the chain without breaking the flow — remove when you're done.
def tap(xs, label):
for x in xs:
print(f'[{label}] {x}')
yield x
# Insert anywhere to inspect what's flowing past.
result = list(tap(range(5), label='after-source'))
Why this works¶
Every stage speaks the iterator protocol: it takes an iterable, yields values, and finishes. Because each stage holds only one record at a time, memory stays constant regardless of stream size. Because each stage is a small, named function, you can unit-test it with a tiny in-memory list — no file, no network.
The four shapes cover almost all pipelines: a source produces values; a filter yields a subset; a transform yields one-out-per-in; an aggregate yields fewer values (often one). Most pipelines are source → filter → transform → ... → consumer, with as many stages as you need.
Debugging a deep pipeline is easy: drop a tap stage that prints each value as it passes through, and you can see what any stage is seeing without breaking the flow.
Trade-offs¶
Small one-line transforms don't need a named function — a generator expression (r for r in readings if r[2] > 8.0) reads fine inline. Named functions earn their place when the logic is non-trivial, when you want to test the stage in isolation, or when you'll reuse the stage elsewhere. Real pipelines mix both freely.
Sort-then-group (used in average_per_bucket above) breaks the constant-memory property — sorted materialises. If your aggregate can be streaming (a running total, a max, a Counter), keep it streaming. If it genuinely needs a sort, accept the memory cost and document the assumption that the stream fits.
The commonest pipeline bug is forgetting to consume: shouts(['hi']) on its own does nothing — list(shouts(['hi'])) actually runs it.
Related reading¶
- Process a large file lazily — the canonical source stage, plus multi-line records and binary chunks.
- Chain and group iterables —
chainandgroupbyas ready-made stages. - Avoid common iterator mistakes — late binding, consuming twice, and the forgotten consumer.
- itertools cheatsheet — stages that ship with the standard library.