{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Combine generators into a pipeline\n",
    "\n",
    "**The question.** You have a multi-step data transformation — filter these records, reshape them, summarise the result — and you want it to stay lazy so a 10 GB input still runs in constant memory. Each stage should be testable on its own.\n",
    "\n",
    "The answer: write each stage as a small generator function, then compose them by nesting. Nothing runs until a consumer (a `for` loop, `sum`, `list`, writing to a file) pulls values through.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# A pipeline of four stages: source -> filter -> transform -> aggregate.\n",
    "# Each stage is a named generator; composition is nesting.\n",
    "import random\n",
    "from itertools import groupby\n",
    "from statistics import mean\n",
    "\n",
    "# Source: produces values. In real code this might read from a file or socket.\n",
    "def read_readings(n, seed=0):\n",
    "    rnd = random.Random(seed)\n",
    "    sensors = ['s1', 's2', 's3']\n",
    "    for i in range(n):\n",
    "        yield (i, rnd.choice(sensors), rnd.gauss(10, 2))\n",
    "\n",
    "# Filter: subset of input, same shape.\n",
    "def above(readings, threshold):\n",
    "    for r in readings:\n",
    "        if r[2] > threshold:\n",
    "            yield r\n",
    "\n",
    "# Transform: one value out per value in.\n",
    "def bucketise(readings, size):\n",
    "    for ts, sensor, value in readings:\n",
    "        yield ((ts // size) * size, sensor, value)\n",
    "\n",
    "# Aggregate: yields FEWER values than it consumed.\n",
    "def average_per_bucket(readings):\n",
    "    keyed = sorted(((r[0], r[1]), r[2]) for r in readings)   # groupby needs adjacency\n",
    "    for key, group in groupby(keyed, key=lambda p: p[0]):\n",
    "        yield (*key, mean(v for _, v in group))\n",
    "\n",
    "# Wire it up by nesting. The order reads inside-out: source at the core.\n",
    "pipeline = average_per_bucket(\n",
    "    bucketise(above(read_readings(200, seed=1), 8.0), 50)\n",
    ")\n",
    "\n",
    "# The consumer — nothing happens until you iterate.\n",
    "for row in list(pipeline)[:6]:\n",
    "    print(row)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Variant: compose stages from a list with `reduce`\n",
    "\n",
    "When the stages are decided at runtime (driven by user config, say), you can't hard-code the call chain. A one-line `compose` helper applies stages in order; `functools.partial` supplies extra arguments.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from functools import partial, reduce\n",
    "\n",
    "def compose(source, *stages):\n",
    "    '''Apply each stage to the running iterable, in order.'''\n",
    "    return reduce(lambda it, stage: stage(it), stages, source)\n",
    "\n",
    "def square(xs):\n",
    "    for x in xs:\n",
    "        yield x * x\n",
    "\n",
    "def above_value(xs, threshold):\n",
    "    for x in xs:\n",
    "        if x > threshold:\n",
    "            yield x\n",
    "\n",
    "stages = [partial(above_value, threshold=2), square]\n",
    "print(list(compose(range(6), *stages)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Variant: tap a stream for debugging\n",
    "\n",
    "A `tap` stage yields each value through unchanged but also prints (or logs) it. Insert anywhere in the chain without breaking the flow — remove when you're done.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tap(xs, label):\n",
    "    for x in xs:\n",
    "        print(f'[{label}] {x}')\n",
    "        yield x\n",
    "\n",
    "# Insert anywhere to inspect what's flowing past.\n",
    "result = list(tap(range(5), label='after-source'))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Why this works\n",
    "\n",
    "Every stage speaks the iterator protocol: it takes an iterable, yields values, and finishes. Because each stage holds only one record at a time, memory stays constant regardless of stream size. Because each stage is a small, named function, you can unit-test it with a tiny in-memory list — no file, no network.\n",
    "\n",
    "The four shapes cover almost all pipelines: a **source** produces values; a **filter** yields a subset; a **transform** yields one-out-per-in; an **aggregate** yields fewer values (often one). Most pipelines are source → filter → transform → ... → consumer, with as many stages as you need.\n",
    "\n",
    "Debugging a deep pipeline is easy: drop a `tap` stage that prints each value as it passes through, and you can see what any stage is seeing without breaking the flow.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Trade-offs\n",
    "\n",
    "Small one-line transforms don't need a named function — a generator expression `(r for r in readings if r[2] > 8.0)` reads fine inline. Named functions earn their place when the logic is non-trivial, when you want to test the stage in isolation, or when you'll reuse the stage elsewhere. Real pipelines mix both freely.\n",
    "\n",
    "Sort-then-group (used in `average_per_bucket` above) breaks the constant-memory property — `sorted` materialises. If your aggregate can be streaming (a running total, a max, a Counter), keep it streaming. If it genuinely needs a sort, accept the memory cost and document the assumption that the stream fits.\n",
    "\n",
    "The commonest pipeline bug is forgetting to consume: `shouts(['hi'])` on its own does nothing — `list(shouts(['hi']))` actually runs it.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Related reading\n",
    "\n",
    "- [Process a large file lazily](https://agilearn.co.uk/guides/iterators-and-generators/recipes/process-a-large-file-lazily) — the canonical source stage, plus multi-line records and binary chunks.\n",
    "- [Chain and group iterables](https://agilearn.co.uk/guides/iterators-and-generators/recipes/chain-and-group-iterables) — `chain` and `groupby` as ready-made stages.\n",
    "- [Avoid common iterator mistakes](https://agilearn.co.uk/guides/iterators-and-generators/recipes/avoid-common-iterator-mistakes) — late binding, consuming twice, and the forgotten consumer.\n",
    "- [itertools cheatsheet](https://agilearn.co.uk/guides/iterators-and-generators/reference/itertools-cheatsheet) — stages that ship with the standard library.\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}