{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run blocking calls in a thread pool\n",
    "\n",
    "**The question.** You have a slow, *blocking* function — it downloads a URL, queries a database, reads a file, calls an API — and you need to run it over many inputs. Run sequentially, the waits stack up; you want them to overlap.\n",
    "\n",
    "The answer: this is **I/O-bound** work, so reach for `concurrent.futures.ThreadPoolExecutor`. Submit one call per input, cap concurrency with `max_workers`, and collect the results — in input order with `map`, or as they finish with `as_completed`. Below are the three forms you'll use, with per-task error handling.\n",
    "\n",
    "> Spawns real threads — run locally to see the timings."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The simplest form: `map` for ordered results\n",
    "\n",
    "When every input maps to one output and you want them back in order, `executor.map` is the cleanest. It's the built-in `map`, run concurrently."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor\n",
    "import time\n",
    "\n",
    "def download(url):\n",
    "    time.sleep(0.5)                 # stand-in for a blocking network call\n",
    "    return f'{url}: 200 OK ({len(url)} bytes)'\n",
    "\n",
    "urls = [f'https://example.com/page/{i}' for i in range(10)]\n",
    "\n",
    "start = time.perf_counter()\n",
    "with ThreadPoolExecutor(max_workers=5) as pool:\n",
    "    results = list(pool.map(download, urls))   # ordered; raises on first error\n",
    "\n",
    "print(results[0])\n",
    "print(f'10 downloads in {time.perf_counter() - start:.2f}s')   # ~1s, not 5s"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`map` is concise but has one drawback: if any call raises, iterating the results re-raises that exception and you lose the rest. When some failures are expected, use `as_completed` instead."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `as_completed`: results as they finish, with per-task error handling\n",
    "\n",
    "Submit each call, keep a dict mapping the future back to its input, and process each result the moment it's ready. Wrap `.result()` in `try`/`except` so one failure doesn't sink the batch — collect successes and failures separately."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {
    "tags": [
     "no-run"
    ]
   },
   "execution_count": null,
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor, as_completed\n",
    "import random\n",
    "\n",
    "def download(url):\n",
    "    time.sleep(random.uniform(0.1, 0.6))\n",
    "    if url.endswith('7'):\n",
    "        raise ConnectionError('timed out')   # simulate a failure\n",
    "    return f'{url}: ok'\n",
    "\n",
    "results, errors = {}, {}\n",
    "with ThreadPoolExecutor(max_workers=5) as pool:\n",
    "    future_to_url = {pool.submit(download, u): u for u in urls}\n",
    "    for future in as_completed(future_to_url):\n",
    "        url = future_to_url[future]\n",
    "        try:\n",
    "            results[url] = future.result()\n",
    "        except Exception as exc:             # catch per task, keep going\n",
    "            errors[url] = repr(exc)\n",
    "\n",
    "print(f'{len(results)} ok, {len(errors)} failed')\n",
    "print('failures:', errors)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is the pattern to default to for real network work: every task either lands in `results` or `errors`, nothing is lost, and one bad URL doesn't abort the other nine."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Choosing `max_workers`\n",
    "\n",
    "For I/O-bound work the pool can be much larger than your core count — the threads are mostly waiting, not computing. Sensible starting points:\n",
    "\n",
    "- **Network/API calls:** tens to low hundreds, but respect the service's rate limits and connection caps. More threads past the point of saturation just add overhead.\n",
    "- **Local disk:** small (4–8); disks don't parallelise well and too many readers thrash.\n",
    "\n",
    "Leaving `max_workers` unset defaults to `min(32, os.cpu_count() + 4)` — a fine default for light I/O. When in doubt, measure a couple of values rather than guessing."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## When this is the wrong tool\n",
    "\n",
    "If `download` were **CPU-bound** instead — parsing, hashing, compression — threads would give you *no* speedup, because the GIL serialises Python computation. Swap `ThreadPoolExecutor` for `ProcessPoolExecutor` ([that recipe](https://agilearn.co.uk/guides/concurrency/recipes/parallelise-cpu-work-across-processes)). If you're making *thousands* of concurrent network calls and your libraries are async-capable, [`asyncio`](https://agilearn.co.uk/guides/concurrency/recipes/run-async-tasks-concurrently) scales further on a single thread."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}