{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Parallelise CPU work across processes\n",
    "\n",
    "**The question.** You have a CPU-bound function — it crunches numbers, parses, hashes, resizes images — and a big pile of inputs. It pins one core while the other seven sit idle. You want to use them all.\n",
    "\n",
    "The answer: `concurrent.futures.ProcessPoolExecutor`. Each worker is a separate interpreter with its own GIL, so the work runs in true parallel across cores. The interface matches the thread pool; the differences are that the function must be top-level (picklable), the launch must be guarded by `if __name__ == \"__main__\"`, and you should chunk small tasks to beat the overhead.\n",
    "\n",
    "> Spawns real processes — save as a `.py` file and run it locally; it won't run in the browser or a bare REPL."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The pattern\n",
    "\n",
    "Define the worker at module top level, create the pool inside the `__main__` guard, and `map` your inputs across it."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "# save as primes.py and run:  python primes.py\n",
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import time\n",
    "\n",
    "def is_prime(n):                      # top-level so it can be pickled to workers\n",
    "    if n < 2:\n",
    "        return False\n",
    "    for i in range(2, int(n ** 0.5) + 1):\n",
    "        if n % i == 0:\n",
    "            return False\n",
    "    return True\n",
    "\n",
    "def main():\n",
    "    numbers = range(2, 200_000)\n",
    "    start = time.perf_counter()\n",
    "    with ProcessPoolExecutor() as pool:           # one worker per core by default\n",
    "        flags = pool.map(is_prime, numbers, chunksize=2000)\n",
    "        count = sum(flags)\n",
    "    print(f'{count} primes in {time.perf_counter() - start:.2f}s')\n",
    "\n",
    "if __name__ == '__main__':            # mandatory: stops workers re-running main()\n",
    "    main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Why `chunksize` matters\n",
    "\n",
    "Sending each number to a worker individually means pickling and shipping 200,000 tiny messages — the overhead swamps the actual work. `chunksize=2000` hands each worker a batch of 2000 inputs at a time, cutting the messaging cost by three orders of magnitude. For large iterables of cheap-to-medium tasks, tuning `chunksize` is often a bigger win than adding cores.\n",
    "\n",
    "A rough guide: aim for each chunk to take tens of milliseconds or more of compute. Too small and overhead dominates; too large and the cores finish unevenly (one worker stuck on the last fat chunk while others idle)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Returning richer results\n",
    "\n",
    "`map` returns whatever the worker returns, in input order. To keep the input alongside the result, return a tuple — and remember the return value must be picklable too."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "def classify(n):\n",
    "    return (n, is_prime(n))           # tuple of (input, result)\n",
    "\n",
    "def main():\n",
    "    with ProcessPoolExecutor() as pool:\n",
    "        for n, prime in pool.map(classify, range(2, 12)):\n",
    "            print(f'{n}: {\"prime\" if prime else \"composite\"}')\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Measure the speedup honestly\n",
    "\n",
    "Process pools have real startup and pickling overhead, and Amdahl's law caps the gain at the parallelisable fraction of your program. Always time both ways on the *actual* workload before declaring victory — for small inputs the sequential version often wins."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "def main():\n",
    "    numbers = range(2, 200_000)\n",
    "\n",
    "    start = time.perf_counter()\n",
    "    seq = sum(is_prime(n) for n in numbers)\n",
    "    seq_time = time.perf_counter() - start\n",
    "\n",
    "    start = time.perf_counter()\n",
    "    with ProcessPoolExecutor() as pool:\n",
    "        par = sum(pool.map(is_prime, numbers, chunksize=2000))\n",
    "    par_time = time.perf_counter() - start\n",
    "\n",
    "    print(f'sequential {seq_time:.2f}s | parallel {par_time:.2f}s | {seq_time/par_time:.1f}x')\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Gotchas to expect\n",
    "\n",
    "- **`PicklingError` / `AttributeError` on a lambda or local function** — the worker function must be importable at module top level.\n",
    "- **Programs that spawn endlessly or error on start** — you forgot the `if __name__ == \"__main__\"` guard.\n",
    "- **No speedup, or a slowdown** — tasks are too small (chunk them), the work isn't really CPU-bound (use threads/async), or large data is being copied to and from workers.\n",
    "\n",
    "For the lower-level `multiprocessing` primitives (shared `Value`/`Array`, `Queue`, `Pool`), see the [threading and multiprocessing reference](https://agilearn.co.uk/guides/concurrency/reference/threading-and-multiprocessing-reference)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}