Run async tasks concurrently¶
The question. You have many awaitable operations — async HTTP requests, database queries through an async driver — and you want them to run concurrently rather than one at a time, while capping how many run at once and not letting a slow one stall the rest.
The answer: gather them with asyncio.gather (or a TaskGroup on 3.11+), bound the fan-out with a Semaphore, and wrap each in a timeout. Below is the progression from "run them all" to a production-shaped "run them all, but at most N, each with a deadline, collecting failures."
These run in the browser. Cells use
await main(); in a.pyscript useasyncio.run(main()).
Run them all at once¶
asyncio.gather schedules every coroutine on the loop and returns their results in order.
import asyncio, time
async def fetch(item):
await asyncio.sleep(0.5) # stand-in for an async I/O call
return f'{item} -> ok'
async def main():
items = [f'item-{i}' for i in range(10)]
start = time.perf_counter()
results = await asyncio.gather(*(fetch(i) for i in items))
print(results[0], '...')
print(f'10 in {time.perf_counter() - start:.2f}s') # ~0.5s
await main() # script: asyncio.run(main())
Keep going when some fail¶
By default gather stops at the first exception and raises it. Pass return_exceptions=True to get every result back, with exceptions returned as values instead of raised — then sort successes from failures yourself.
async def flaky(item):
await asyncio.sleep(0.2)
if item.endswith('3'):
raise ConnectionError(f'{item} failed')
return f'{item} ok'
async def main():
items = [f'item-{i}' for i in range(6)]
outcomes = await asyncio.gather(*(flaky(i) for i in items), return_exceptions=True)
ok = [o for o in outcomes if not isinstance(o, Exception)]
bad = [repr(o) for o in outcomes if isinstance(o, Exception)]
print('ok :', ok)
print('bad:', bad)
await main()
Bound the fan-out with a Semaphore¶
Running 10,000 requests literally all at once will exhaust sockets or trip rate limits. A Semaphore(n) lets only n tasks into the work at a time; the rest wait their turn. Acquire it with async with inside each task.
async def fetch_limited(item, sem):
async with sem: # at most N concurrently
await asyncio.sleep(0.5)
return f'{item} ok'
async def main():
sem = asyncio.Semaphore(5) # cap concurrency at 5
items = [f'item-{i}' for i in range(20)]
start = time.perf_counter()
results = await asyncio.gather(*(fetch_limited(i, sem) for i in items))
print(f'{len(results)} done in {time.perf_counter() - start:.2f}s') # 20/5 * 0.5 = ~2s
await main()
Add a per-task timeout¶
Wrap the awaited work in asyncio.timeout (3.11+) so a single slow operation is cancelled rather than holding up everything. Combine it with return_exceptions to record timeouts alongside other failures.
async def maybe_slow(item, sem):
async with sem:
try:
async with asyncio.timeout(1.0): # 3.11+; else asyncio.wait_for
await asyncio.sleep(2 if item.endswith('4') else 0.3)
return f'{item} ok'
except TimeoutError:
return f'{item} TIMED OUT'
async def main():
sem = asyncio.Semaphore(5)
items = [f'item-{i}' for i in range(6)]
for r in await asyncio.gather(*(maybe_slow(i, sem) for i in items)):
print(r)
await main()
The production shape: TaskGroup (3.11+)¶
For new code, asyncio.TaskGroup is the structured way to run a batch: tasks are created inside an async with block that won't exit until they finish, and if one raises, the rest are cancelled and the error propagates cleanly — no leaked tasks. Collect results via the task objects.
async def main():
sem = asyncio.Semaphore(5)
items = [f'item-{i}' for i in range(8)]
tasks = []
async with asyncio.TaskGroup() as tg: # Python 3.11+
for i in items:
tasks.append(tg.create_task(fetch_limited(i, sem)))
results = [t.result() for t in tasks] # all done once block exits
print(f'{len(results)} done')
await main()
The cardinal rule still applies¶
All of this depends on every task yielding at await. If a coroutine calls something blocking — time.sleep, requests.get, a synchronous DB driver, a heavy CPU loop — it freezes the whole loop and the concurrency evaporates. Push blocking calls onto a thread with await asyncio.to_thread(fn, *args), and CPU-bound work onto a process pool via loop.run_in_executor. See the async/await notebook and the common mistakes catalogue.