{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run async tasks concurrently\n",
    "\n",
    "**The question.** You have many awaitable operations — async HTTP requests, database queries through an async driver — and you want them to run concurrently rather than one at a time, while capping how many run at once and not letting a slow one stall the rest.\n",
    "\n",
    "The answer: gather them with `asyncio.gather` (or a `TaskGroup` on 3.11+), bound the fan-out with a `Semaphore`, and wrap each in a timeout. Below is the progression from \"run them all\" to a production-shaped \"run them all, but at most N, each with a deadline, collecting failures.\"\n",
    "\n",
    "> These run in the browser. Cells use `await main()`; in a `.py` script use `asyncio.run(main())`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run them all at once\n",
    "\n",
    "`asyncio.gather` schedules every coroutine on the loop and returns their results in order."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "import asyncio, time\n",
    "\n",
    "async def fetch(item):\n",
    "    await asyncio.sleep(0.5)            # stand-in for an async I/O call\n",
    "    return f'{item} -> ok'\n",
    "\n",
    "async def main():\n",
    "    items = [f'item-{i}' for i in range(10)]\n",
    "    start = time.perf_counter()\n",
    "    results = await asyncio.gather(*(fetch(i) for i in items))\n",
    "    print(results[0], '...')\n",
    "    print(f'10 in {time.perf_counter() - start:.2f}s')   # ~0.5s\n",
    "\n",
    "await main()                           # script: asyncio.run(main())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Keep going when some fail\n",
    "\n",
    "By default `gather` stops at the first exception and raises it. Pass `return_exceptions=True` to get every result back, with exceptions returned *as values* instead of raised — then sort successes from failures yourself."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "async def flaky(item):\n",
    "    await asyncio.sleep(0.2)\n",
    "    if item.endswith('3'):\n",
    "        raise ConnectionError(f'{item} failed')\n",
    "    return f'{item} ok'\n",
    "\n",
    "async def main():\n",
    "    items = [f'item-{i}' for i in range(6)]\n",
    "    outcomes = await asyncio.gather(*(flaky(i) for i in items), return_exceptions=True)\n",
    "    ok = [o for o in outcomes if not isinstance(o, Exception)]\n",
    "    bad = [repr(o) for o in outcomes if isinstance(o, Exception)]\n",
    "    print('ok :', ok)\n",
    "    print('bad:', bad)\n",
    "\n",
    "await main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Bound the fan-out with a `Semaphore`\n",
    "\n",
    "Running 10,000 requests literally all at once will exhaust sockets or trip rate limits. A `Semaphore(n)` lets only `n` tasks into the work at a time; the rest wait their turn. Acquire it with `async with` *inside* each task."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "async def fetch_limited(item, sem):\n",
    "    async with sem:                    # at most N concurrently\n",
    "        await asyncio.sleep(0.5)\n",
    "        return f'{item} ok'\n",
    "\n",
    "async def main():\n",
    "    sem = asyncio.Semaphore(5)         # cap concurrency at 5\n",
    "    items = [f'item-{i}' for i in range(20)]\n",
    "    start = time.perf_counter()\n",
    "    results = await asyncio.gather(*(fetch_limited(i, sem) for i in items))\n",
    "    print(f'{len(results)} done in {time.perf_counter() - start:.2f}s')  # 20/5 * 0.5 = ~2s\n",
    "\n",
    "await main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Add a per-task timeout\n",
    "\n",
    "Wrap the awaited work in `asyncio.timeout` (3.11+) so a single slow operation is cancelled rather than holding up everything. Combine it with `return_exceptions` to record timeouts alongside other failures."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "async def maybe_slow(item, sem):\n",
    "    async with sem:\n",
    "        try:\n",
    "            async with asyncio.timeout(1.0):       # 3.11+; else asyncio.wait_for\n",
    "                await asyncio.sleep(2 if item.endswith('4') else 0.3)\n",
    "                return f'{item} ok'\n",
    "        except TimeoutError:\n",
    "            return f'{item} TIMED OUT'\n",
    "\n",
    "async def main():\n",
    "    sem = asyncio.Semaphore(5)\n",
    "    items = [f'item-{i}' for i in range(6)]\n",
    "    for r in await asyncio.gather(*(maybe_slow(i, sem) for i in items)):\n",
    "        print(r)\n",
    "\n",
    "await main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The production shape: `TaskGroup` (3.11+)\n",
    "\n",
    "For new code, `asyncio.TaskGroup` is the structured way to run a batch: tasks are created inside an `async with` block that won't exit until they finish, and if one raises, the rest are cancelled and the error propagates cleanly — no leaked tasks. Collect results via the task objects."
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "async def main():\n",
    "    sem = asyncio.Semaphore(5)\n",
    "    items = [f'item-{i}' for i in range(8)]\n",
    "    tasks = []\n",
    "    async with asyncio.TaskGroup() as tg:          # Python 3.11+\n",
    "        for i in items:\n",
    "            tasks.append(tg.create_task(fetch_limited(i, sem)))\n",
    "    results = [t.result() for t in tasks]          # all done once block exits\n",
    "    print(f'{len(results)} done')\n",
    "\n",
    "await main()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The cardinal rule still applies\n",
    "\n",
    "All of this depends on every task yielding at `await`. If a coroutine calls something blocking — `time.sleep`, `requests.get`, a synchronous DB driver, a heavy CPU loop — it freezes the whole loop and the concurrency evaporates. Push blocking calls onto a thread with `await asyncio.to_thread(fn, *args)`, and CPU-bound work onto a process pool via `loop.run_in_executor`. See the [async/await notebook](https://agilearn.co.uk/guides/concurrency/learn/04-async-await) and the [common mistakes catalogue](https://agilearn.co.uk/guides/concurrency/recipes/avoid-common-concurrency-mistakes)."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}