{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Threads and futures\n",
    "\n",
    "Threads are Python's most direct route to overlapping work. A thread is a separate line of execution inside your process that shares all the same memory. Because of the GIL, threads won't speed up pure computation — but for **I/O-bound** work, where each task spends its time *waiting*, they're ideal: while one thread waits on the network, the others run.\n",
    "\n",
    "This notebook starts with the low-level `threading.Thread` to show what a thread is, then moves quickly to `concurrent.futures.ThreadPoolExecutor`, which is what you should actually use almost every time.\n",
    "\n",
    "> These examples spawn real OS threads and won't run inside the in-browser sandbox — run them locally to see the timings."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## A raw thread\n",
    "\n",
    "`threading.Thread` takes a `target` function and its arguments. `.start()` runs it on a new thread; `.join()` blocks until it finishes."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "import threading\n",
    "import time\n",
    "\n",
    "def worker(name):\n",
    "    print(f'{name} starting')\n",
    "    time.sleep(0.5)        # stand-in for a blocking I/O call\n",
    "    print(f'{name} done')\n",
    "\n",
    "t = threading.Thread(target=worker, args=('A',))\n",
    "t.start()      # runs worker('A') on a new thread\n",
    "print('main thread keeps going while A works')\n",
    "t.join()       # wait for the thread to finish\n",
    "print('A has finished')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run several at once and the waits overlap. Three threads each sleeping 0.5s finish in about 0.5s total, not 1.5s — that's the whole point."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "threads = [threading.Thread(target=worker, args=(f'T{i}',)) for i in range(3)]\n",
    "\n",
    "start = time.perf_counter()\n",
    "for t in threads:\n",
    "    t.start()      # start all three\n",
    "for t in threads:\n",
    "    t.join()       # then wait for all three\n",
    "print(f'elapsed: {time.perf_counter() - start:.2f}s')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This works, but it's clumsy for real use. Two problems jump out: there's no clean way to get a **return value** back from `worker`, and managing lists of threads by hand gets fiddly fast. Both are solved by the thread pool."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `ThreadPoolExecutor`: the tool you'll actually use\n",
    "\n",
    "`concurrent.futures.ThreadPoolExecutor` manages a fixed pool of worker threads for you. You hand it work; it gives you back a **`Future`** — an object representing a result that isn't ready yet. Call `.result()` on the future to get the value (blocking until it's ready), and any exception raised in the worker is re-raised there.\n",
    "\n",
    "Use it as a context manager so the pool is cleaned up automatically."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor\n",
    "\n",
    "def fetch(url):\n",
    "    time.sleep(0.3)              # pretend this is a network request\n",
    "    return f'{url} -> 200 OK'\n",
    "\n",
    "with ThreadPoolExecutor(max_workers=4) as pool:\n",
    "    future = pool.submit(fetch, 'https://example.com')   # returns immediately\n",
    "    print('submitted; doing other things...')\n",
    "    print(future.result())                               # blocks until ready"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `submit` many, collect results\n",
    "\n",
    "`submit` returns one future per call. To run many tasks, submit them all (they start running straight away, up to `max_workers` at a time), then collect the results."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "urls = [f'https://site/{i}' for i in range(8)]\n",
    "\n",
    "start = time.perf_counter()\n",
    "with ThreadPoolExecutor(max_workers=4) as pool:\n",
    "    futures = [pool.submit(fetch, u) for u in urls]\n",
    "    results = [f.result() for f in futures]   # results in submission order\n",
    "\n",
    "print(f'{len(results)} fetched in {time.perf_counter() - start:.2f}s')\n",
    "print(results[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Eight tasks of 0.3s each, four at a time: two waves, about 0.6s — versus 2.4s sequentially. The pool size caps how many run concurrently, which matters when you don't want to open 10,000 connections at once."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `map`: simplest when inputs map to outputs\n",
    "\n",
    "If you're applying one function to many inputs and want the results back **in order**, `executor.map` is the cleanest form. It mirrors the built-in `map`, but runs the calls concurrently."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "with ThreadPoolExecutor(max_workers=4) as pool:\n",
    "    for result in pool.map(fetch, urls):   # yields in input order\n",
    "        print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `as_completed`: handle results as they arrive\n",
    "\n",
    "Sometimes you don't want to wait for the slowest task to start processing the fast ones. `concurrent.futures.as_completed` yields futures **in the order they finish**, not the order you submitted them. Keep a dict mapping each future back to its input so you know what finished."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "from concurrent.futures import as_completed\n",
    "import random\n",
    "\n",
    "def variable_fetch(url):\n",
    "    time.sleep(random.uniform(0.1, 0.6))   # finishing times will vary\n",
    "    return f'{url} done'\n",
    "\n",
    "with ThreadPoolExecutor(max_workers=4) as pool:\n",
    "    future_to_url = {pool.submit(variable_fetch, u): u for u in urls}\n",
    "    for future in as_completed(future_to_url):\n",
    "        url = future_to_url[future]\n",
    "        print(f'{url}: {future.result()}')   # printed as each finishes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exceptions surface at `.result()`\n",
    "\n",
    "An exception raised inside a worker doesn't crash the pool and isn't printed — it's stored on the future and re-raised when you call `.result()`. This means **you must call `.result()` (or otherwise inspect the future) to find out a task failed.** A common bug is firing off tasks and never checking them, so failures vanish silently."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "def flaky(n):\n",
    "    if n == 3:\n",
    "        raise ValueError(f'cannot process {n}')\n",
    "    return n * 10\n",
    "\n",
    "with ThreadPoolExecutor(max_workers=3) as pool:\n",
    "    futures = {pool.submit(flaky, n): n for n in range(5)}\n",
    "    for future in as_completed(futures):\n",
    "        n = futures[future]\n",
    "        try:\n",
    "            print(f'{n} -> {future.result()}')\n",
    "        except ValueError as exc:\n",
    "            print(f'{n} failed: {exc}')   # handle per-task failure"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Shared state is the catch\n",
    "\n",
    "Threads share memory, which is convenient and dangerous. When two threads modify the same object, operations that *look* atomic often aren't. `counter += 1` is really read, add, write — three steps — and two threads can interleave so an update is lost. This is a **race condition**."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "counter = 0\n",
    "\n",
    "def increment():\n",
    "    global counter\n",
    "    for _ in range(100_000):\n",
    "        counter += 1        # NOT atomic: read, +1, write\n",
    "\n",
    "threads = [threading.Thread(target=increment) for _ in range(4)]\n",
    "for t in threads: t.start()\n",
    "for t in threads: t.join()\n",
    "\n",
    "print('expected:', 4 * 100_000)\n",
    "print('got:     ', counter)   # usually LESS — updates were lost"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The fix is a **`Lock`**: a flag that only one thread can hold at a time. Wrap the critical section in `with lock:` so updates can't interleave."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "counter = 0\n",
    "lock = threading.Lock()\n",
    "\n",
    "def safe_increment():\n",
    "    global counter\n",
    "    for _ in range(100_000):\n",
    "        with lock:          # only one thread in here at a time\n",
    "            counter += 1\n",
    "\n",
    "threads = [threading.Thread(target=safe_increment) for _ in range(4)]\n",
    "for t in threads: t.start()\n",
    "for t in threads: t.join()\n",
    "\n",
    "print('got:', counter)      # now exactly 400000"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Locks fix correctness but cost speed — threads queue for the lock. The best defence is to **avoid shared mutable state**: have each task return a value and combine the results in the main thread (as the `submit`/`map` examples do), rather than having threads write into shared objects."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "- Threads overlap **I/O-bound** work; the GIL stops them helping CPU-bound work.\n",
    "- Prefer `ThreadPoolExecutor` over raw `Thread`. `submit` returns a `Future`; `.result()` gives the value and re-raises exceptions.\n",
    "- `map` for ordered results, `as_completed` for results as they finish.\n",
    "- **Always retrieve results** or failures stay hidden.\n",
    "- Shared mutable state needs a `Lock` — or, better, design it away.\n",
    "\n",
    "Next: [Processes and parallelism](https://agilearn.co.uk/guides/concurrency/learn/03-processes-and-parallelism), for the CPU-bound work threads can't accelerate."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}