Facebook Pixel

Coordination: Producer/Consumer

The producer/consumer pattern decouples two components that operate at different rates. One or more producers put work onto a shared buffer; one or more consumers take work off. The pattern handles the two boundary conditions — the buffer is full (producer must wait) and the buffer is empty (consumer must wait) — without busy-waiting or data corruption.

A web server that hands orders to a background worker illustrates the need. Calling the worker directly from the request handler causes the request thread to block while the worker runs. Under load, the connection pool drains and the server stops accepting new requests. Decoupling with a queue allows the web thread to enqueue the order and return immediately; the worker dequeues and processes at its own pace.

The diagram below shows the producer/consumer topology: one or more producers hand work to a bounded buffer, and one or more consumers drain it independently.

The naive approach and its failures

A shared Python list or Java ArrayList is not thread-safe: concurrent append and pop operations on an unsynchronized list can corrupt its internal structure or lose items silently. Even absent corruption, busy-waiting — polling in a tight loop until the list is non-empty — consumes an entire CPU core while producing no useful work.

1import threading
2import time
3
4# Bad: unsynchronized list + busy-wait
5# 1. list.append / list.pop are not atomic under free-threaded Python.
6# 2. The consumer spins the CPU at 100% while the queue is empty.
7class NaiveOrderQueue:
8    def __init__(self) -> None:
9        self._orders: list[dict] = []   # shared, unsynchronized
10
11    def enqueue(self, order: dict) -> None:
12        self._orders.append(order)       # race: concurrent appends can corrupt
13
14    def process_orders(self) -> None:
15        while True:
16            if self._orders:             # busy-wait: checks millions of times/sec
17                order = self._orders.pop(0)
18                self._process(order)
19            else:
20                time.sleep(0.001)        # token delay, not a real fix
21
22    def _process(self, order: dict) -> None:
23        print(f"processing order {order['id']}")

Blocking queues and condition variables

The correct abstraction is a blocking queue with a fixed capacity. When the queue is full, enqueue blocks the calling thread until a consumer removes an item and makes room. When the queue is empty, dequeue blocks until a producer adds an item. No busy-waiting; no corruption.

The mechanism underneath a blocking queue is a condition variable — a synchronization primitive that lets a thread sleep until a specific condition becomes true. The thread releases its lock, enters a wait state, and is woken by another thread that signals the condition. The entire read-check-block sequence is atomic with respect to the condition: no signal can slip between the check and the wait. That atomicity prevents a lost wakeup — if a producer's signal could fire in the gap after the consumer checks "empty" but before it actually sleeps, the consumer would miss the signal and sleep forever while an item sits waiting. Releasing the lock and waiting in one indivisible step closes that gap.

1import queue
2import threading
3
4# Good: queue.Queue is a thread-safe bounded blocking queue.
5# Producers block when the queue is full; consumers block when empty.
6# No locks to manage manually; no busy-waiting.
7
8ORDER_QUEUE: queue.Queue[dict] = queue.Queue(maxsize=1000)
9
10def web_handler(order: dict) -> None:
11    """Called by a web-server thread for each incoming HTTP request."""
12    ORDER_QUEUE.put(order)        # blocks if queue is full (backpressure)
13    # returns immediately; the HTTP response is sent without waiting for processing
14
15def order_worker() -> None:
16    """Runs in a dedicated background thread."""
17    while True:
18        order = ORDER_QUEUE.get()  # blocks until an order is available
19        try:
20            process_order(order)
21        finally:
22            ORDER_QUEUE.task_done()
23
24def process_order(order: dict) -> None:
25    print(f"processed order {order['id']}")
26
27# Start worker threads
28for _ in range(4):
29    t = threading.Thread(target=order_worker, daemon=True)
30    t.start()
Invest in Yourself
Your new job is waiting. 83% of people that complete the program get a job offer. Unlock unlimited access to all content and features.
Go Pro