Asyncio gather vs wait: detection and resolution

Unexpected concurrency slowdown in a data pipeline that loads pandas DataFrames usually appears in production ETL jobs where many I/O‑bound coroutines fetch CSV files. Choosing asyncio.gather versus asyncio.wait determines how tasks are scheduled and how exceptions propagate, which directly impacts overall latency and data integrity.

# Example showing the issue
import asyncio
import pandas as pd

async def load_csv(i):
    # Simulate I/O
    await asyncio.sleep(0.1 * i)
    return pd.DataFrame({"id": [i], "value": [i * 10]})

async def main_wrong():
    tasks = [load_csv(i) for i in range(3)]
    # WRONG: asyncio.wait returns two sets; we ignore the order
    done, _ = await asyncio.wait(tasks)
    results = [t.result() for t in done]
    print(f"Number of DataFrames: {len(results)}")
    print(results)  # order is nondeterministic, may cause mismatched joins

asyncio.run(main_wrong())

asyncio.wait returns two unordered collections (done and pending), so the resulting list of DataFrames loses the original order. When those frames are later merged, pandas performs joins on mismatched keys, inflating or shrinking row counts. This behavior follows the official Python documentation for asyncio.wait, which explicitly states that the order of completed tasks is undefined. Related factors:

  • No guarantee of result ordering
  • Exceptions are raised per‑task, not aggregated
  • Additional bookkeeping needed to map results back to inputs

To diagnose this in your code:

Running the snippet above typically prints something like:

Number of DataFrames: 3
[   id  value
0   2     20,
   id  value
0   0      0,
   id  value
0   1     10]

Notice the shuffled order  the DataFrames no longer line up with their original indices, which quickly surfaces as duplicate or missing rows in downstream pandas joins.

Fixing the Issue

When you need ordered results and unified error handling

Use asyncio.gather; it returns results in the same order the coroutines were passed and raises the first exception.

async def main_gather():
    tasks = [load_csv(i) for i in range(3)]
    results = await asyncio.gather(*tasks, return_exceptions=False)
    print(f"Ordered DataFrames: {len(results)}")
    print(results)

When you need fine‑grained control (e.g., cancel remaining tasks after the first failure)

Use asyncio.wait with an explicit return_when policy and re‑associate results to their inputs.

async def main_wait():
    tasks = {i: asyncio.create_task(load_csv(i)) for i in range(3)}
    done, pending = await asyncio.wait(tasks.values(), return_when=asyncio.FIRST_EXCEPTION)
    # Re‑map completed tasks back to their original index
    results = {i: tasks[i].result() for i in tasks if tasks[i] in done}
    # Optionally cancel pending tasks
    for p in pending:
        p.cancel()
    print(f"Completed: {list(results.keys())}")

Why this works: gather preserves order and aggregates exceptions, ideal for batch ETL where each CSV corresponds to a known partition. wait shines when you must react to the first failure or limit concurrency, but you must manually maintain the mapping to keep pandas joins correct.

Human touch: The gotcha that bit us during a nightly pipeline refactor was that we switched from gather to wait to add a timeout, only to discover later that the shuffled DataFrames caused duplicate rows in a downstream merge, breaking our KPI calculations.

What Doesn’t Work

❌ Using await asyncio.wait(tasks) and then iterating over tasks assuming they are completed: results may still be pending, causing InvalidStateError.

❌ Setting return_exceptions=True on gather and silently ignoring the returned Exception objects: errors disappear and downstream pandas joins get corrupted data.

❌ Calling asyncio.wait(tasks, timeout=5) and discarding pending without cancellation: stray tasks keep running and may write partial CSVs, leading to duplicate rows.

  • Assuming asyncio.wait preserves the order of the input coroutines.
  • Swallowing exceptions from individual tasks because return_exceptions=True hides failures.
  • Cancelling pending tasks without awaiting their cancellation, leaving stray coroutines.
  • Mixing asyncio.wait with pandas.concat assuming results are aligned.

When NOT to optimize

  • Tiny scripts: If you are processing fewer than five files interactively, the overhead of ordering logic is negligible.
  • One‑off ad‑hoc analysis: For Jupyter notebooks where speed is secondary to rapid prototyping, using wait for convenience is acceptable.
  • Legacy code with strict API contracts: If the surrounding code already expects unordered results and handles re‑ordering downstream, rewriting may introduce regressions.
  • CPU‑bound workloads: When the coroutines are CPU‑intensive and you are already using ThreadPoolExecutor, the choice between gather and wait has minimal impact.

Frequently Asked Questions

Q: Can asyncio.gather be used with a timeout?

Yes, wrap it in asyncio.wait_for to impose a global timeout.

Q: Does asyncio.wait guarantee that the first completed task is returned first?

No, the order of the done set is undefined.


Understanding the subtle trade‑offs between asyncio.gather and asyncio.wait is essential when building reliable data pipelines that feed pandas DataFrames. Preserve ordering with gather for straightforward batch loads, and reserve wait for scenarios that demand granular cancellation or early failure detection. Applying the right primitive keeps your joins deterministic and your latency predictable.

Fix Python async concurrency issuesFix How to handle backpressure in async generatorsWhy CPython GIL hurts CPU-bound threads but not I/O