Async generator backpressure: detection and resolution
Unexpected memory growth in async pipelines that process pandas DataFrames from CSV exports or API feeds is caused by unbounded backpressure. When the producer outruns the consumer, the internal queue swells, silently breaking downstream processing.
# Example showing the issue
import asyncio, pandas as pd
# Simulate a 1‑million‑row DataFrame
df = pd.DataFrame({"id": range(1_000_000)})
async def producer(q: asyncio.Queue):
for _, row in df.iterrows():
await q.put(row)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue):
count = 0
while True:
item = await q.get()
if item is None:
break
# Simulate slow processing
await asyncio.sleep(0.0005)
count += 1
if count % 200_000 == 0:
print(f"processed {count} rows, queue size={q.qsize()}")
async def main():
q = asyncio.Queue() # unbounded!
await asyncio.gather(producer(q), consumer(q))
asyncio.run(main())
# Expected output shows queue size climbing into the hundreds of thousands, eventually OOM.
The producer pushes items onto an unbounded asyncio.Queue faster than the consumer can await them. asyncio.Queue without a maxsize behaves like an infinite buffer, so memory usage grows until the process is killed. This follows the async iterator contract in PEP 525, where backpressure is only applied when the consumer awaits the iterator. Related factors:
- No maxsize on the queue
- Heavy CPU‑bound work inside the consumer
- Lack of explicit flow‑control signals between producer and consumer
To diagnose this in your code:
import logging, asyncio
logging.basicConfig(level=logging.INFO)
async def monitor(q: asyncio.Queue, interval: float = 1.0):
while True:
size = q.qsize()
if size > 100_000:
logging.warning(f"Queue size={size} – backpressure building up")
await asyncio.sleep(interval)
# In main(), start monitor(q) alongside producer/consumer to surface the issue in CI logs.
Fixing the Issue
Quick Fix (1‑Liner Solution)
q = asyncio.Queue(maxsize=10_000) # cap the buffer size
When to use: Development, debugging, quick tests Trade‑off: Producer will pause once the queue fills, revealing the bottleneck early.
Best Practice Solution (Production‑Ready)
import asyncio, logging
async def producer(q: asyncio.Queue, df: pd.DataFrame):
for _, row in df.iterrows():
await q.put(row) # back‑pressure applied automatically when q is full
await q.put(None)
async def consumer(q: asyncio.Queue):
while True:
item = await q.get()
if item is None:
break
# real work here (e.g., write to DB, transform row)
await process_row(item)
q.task_done()
async def main(df: pd.DataFrame):
q = asyncio.Queue(maxsize=5_000) # tuned to memory budget
monitor = asyncio.create_task(monitor_queue(q))
prod = asyncio.create_task(producer(q, df))
cons = asyncio.create_task(consumer(q))
await asyncio.gather(prod, cons)
await q.join() # ensure all items processed
monitor.cancel()
async def monitor_queue(q: asyncio.Queue, interval: float = 1.0):
while True:
size = q.qsize()
if size > q.maxsize * 0.8:
logging.warning(f"Queue at {size}/{q.maxsize} – consider scaling consumer")
await asyncio.sleep(interval)
When to use: Production code, data pipelines, team projects Why better: Bounded queue enforces natural back‑pressure, prevents OOM, provides observable metrics, and lets you scale consumers based on queue health.
The gotcha we hit in production was that a downstream CPU‑bound transformation suddenly slowed after a schema change; the unbounded queue kept growing until the pod restarted. Adding a maxsize exposed the slowdown instantly.
What Doesn’t Work
❌ await asyncio.sleep(0) in the producer to “slow it down”: this only yields control, it doesn’t limit memory growth.
❌ Switching the queue to list.append and polling with while items: …: loses the await‑based back‑pressure and blocks the event loop.
❌ Catching QueueFull and discarding items: you lose data silently, leading to inconsistent downstream results.
- Setting maxsize too low, causing the producer to stall and dead‑lock because no consumer is running yet.
- Using
asyncio.sleepin the producer to simulate back‑pressure – it masks the real bottleneck and wastes CPU cycles. - Calling
queue.put_nowaitinstead ofawait queue.put, which raisesQueueFulland crashes the task.
When NOT to optimize
- Tiny scripts: processing a few dozen rows where memory impact is negligible.
- One‑off data migrations: run once, monitor manually, and accept the temporary memory spike.
- Pure in‑memory transformations: if the whole dataset fits comfortably in RAM and the pipeline runs in a controlled environment.
- Benchmarking code: you may deliberately disable back‑pressure to measure raw throughput.
Frequently Asked Questions
Q: Does the asyncio.Queue maxsize behave the same in Python 3.13?
Yes, the semantics of a bounded queue are unchanged from 3.10 through 3.13.
Q: Can I use asyncio.Semaphore instead of a bounded queue for flow control?
A semaphore can limit concurrency but won’t store pending items; a bounded queue both buffers and applies back‑pressure.
The key insight is that async generators only apply back‑pressure when the consumer awaits each item. By bounding the queue and monitoring its size, you turn an invisible memory leak into a manageable signal. Once we added the bounded queue and a watchdog logger, our nightly ETL runs stopped crashing on large pandas exports.
Related Issues
→ Fix Python async concurrency issues → Why asyncio gather vs wait tradeoffs matter in Python → Fix subprocess communicate deadlock in Python