1188. Design Bounded Blocking Queue 🔒
Problem Description
This problem asks you to implement a bounded blocking queue that is thread-safe and can handle concurrent operations from multiple threads.
The queue has a fixed maximum capacity and must support the following operations:
-
BoundedBlockingQueue(int capacity)
: Initialize the queue with a maximum capacity limit. -
enqueue(int element)
: Add an element to the queue. The key requirement is that if the queue is already at full capacity, the calling thread must block (wait) until space becomes available in the queue. -
dequeue()
: Remove and return an element from the queue. If the queue is empty, the calling thread must block (wait) until an element becomes available. -
size()
: Return the current number of elements in the queue.
The "blocking" behavior is crucial here - when a producer thread tries to enqueue to a full queue, it should pause execution until a consumer thread dequeues an element. Similarly, when a consumer thread tries to dequeue from an empty queue, it should pause until a producer thread enqueues an element.
The solution uses two semaphores to coordinate this blocking behavior:
s1
(initialized to capacity): Tracks available space in the queue. Producer threads acquire this semaphore before enqueuing.s2
(initialized to 0): Tracks available elements in the queue. Consumer threads acquire this semaphore before dequeuing.
The implementation uses a deque
as the underlying data structure, with append()
for enqueue operations and popleft()
for dequeue operations, maintaining FIFO order. The semaphores ensure thread-safety and proper blocking when the queue is full or empty.
Intuition
The key insight is recognizing that we need to coordinate two types of threads with opposite concerns: producers want to know if there's space to add elements, while consumers want to know if there are elements to remove.
Think of it like a parking garage with a fixed number of spaces. Cars (producers) can only enter if there are empty spaces, and cars (consumers) can only leave if there are cars present. We need two counters: one for empty spaces and one for occupied spaces.
This naturally leads us to use semaphores, which are perfect for managing limited resources across threads. A semaphore is essentially a thread-safe counter that can block threads when the count reaches zero.
We need two semaphores because we're tracking two different resources:
- Available capacity - starts at
capacity
and decreases when we enqueue - Available elements - starts at
0
and increases when we enqueue
The brilliant part is how these semaphores create the blocking behavior automatically:
- When
enqueue()
is called, we first try to acquire from the "capacity" semaphore (s1
). If the queue is full,s1
is at 0, so the thread blocks until someone dequeues and releases a capacity permit. - When
dequeue()
is called, we first try to acquire from the "elements" semaphore (s2
). If the queue is empty,s2
is at 0, so the thread blocks until someone enqueues and releases an element permit.
After successfully acquiring the appropriate semaphore, we perform the actual queue operation, then release the opposite semaphore to signal the change to waiting threads. This acquire-operate-release pattern ensures that the queue never exceeds capacity and dequeue never operates on an empty queue, all while maintaining thread safety through the semaphores' built-in synchronization.
Solution Approach
The implementation uses two semaphores and a deque to create a thread-safe bounded blocking queue.
Data Structures Used:
deque
: A double-ended queue for efficient O(1) operations at both ends- Two
Semaphore
objects for synchronization
Implementation Details:
-
Constructor (
__init__
):self.s1 = Semaphore(capacity) # Tracks available space self.s2 = Semaphore(0) # Tracks available elements self.q = deque() # The actual queue
s1
starts withcapacity
permits, representing empty slots in the queues2
starts with 0 permits, representing no elements initiallyq
is the underlying queue structure
-
Enqueue Operation:
def enqueue(self, element: int) -> None: self.s1.acquire() # Wait for available space self.q.append(element) # Add to queue self.s2.release() # Signal element available
s1.acquire()
decrements the available space counter. If it's 0 (queue full), the thread blocks- Once space is available, add the element to the queue
s2.release()
increments the element counter, potentially unblocking a waiting consumer
-
Dequeue Operation:
def dequeue(self) -> int: self.s2.acquire() # Wait for available element ans = self.q.popleft() # Remove from queue self.s1.release() # Signal space available return ans
s2.acquire()
decrements the element counter. If it's 0 (queue empty), the thread blocks- Once an element is available, remove it from the front of the queue
s1.release()
increments the space counter, potentially unblocking a waiting producer
-
Size Operation:
def size(self) -> int: return len(self.q)
- Simply returns the current queue length
- Note: This doesn't need synchronization as Python's
len()
on deque is thread-safe for reading
Why This Works:
- The semaphores act as gatekeepers: producers can't proceed when
s1 = 0
(full), consumers can't proceed whens2 = 0
(empty) - The acquire-modify-release pattern ensures atomicity of operations
- The sum
s1.value + s2.value
always equalscapacity
, maintaining the invariant - No explicit locks are needed because semaphores handle synchronization internally
Ready to land your dream job?
Unlock your dream job with a 5-minute evaluator for a personalized learning plan!
Start EvaluatorExample Walkthrough
Let's walk through a concrete example with a bounded blocking queue of capacity 2.
Initial State:
- Queue capacity: 2
s1
(space semaphore): 2 permits (2 empty slots available)s2
(element semaphore): 0 permits (0 elements in queue)- Queue:
[]
Step 1: Thread A calls enqueue(5)
s1.acquire()
→ s1 decrements from 2 to 1 (claiming one empty slot)- Add 5 to queue:
[5]
s2.release()
→ s2 increments from 0 to 1 (signaling one element available)- State: s1=1, s2=1, Queue=
[5]
Step 2: Thread B calls enqueue(10)
s1.acquire()
→ s1 decrements from 1 to 0 (claiming the last empty slot)- Add 10 to queue:
[5, 10]
s2.release()
→ s2 increments from 1 to 2 (signaling two elements available)- State: s1=0, s2=2, Queue=
[5, 10]
(FULL)
Step 3: Thread C calls enqueue(15)
(Queue is FULL)
s1.acquire()
→ s1 is 0, so Thread C BLOCKS waiting for space- Thread C is suspended...
Step 4: Thread D calls dequeue()
s2.acquire()
→ s2 decrements from 2 to 1 (claiming one element)- Remove and get 5 from queue:
[10]
s1.release()
→ s1 increments from 0 to 1 (signaling one space available)- This unblocks Thread C!
- Returns: 5
- State: s1=1, s2=1, Queue=
[10]
Step 5: Thread C resumes (was blocked in Step 3)
- Now
s1.acquire()
succeeds → s1 decrements from 1 to 0 - Add 15 to queue:
[10, 15]
s2.release()
→ s2 increments from 1 to 2- State: s1=0, s2=2, Queue=
[10, 15]
Step 6: Thread E calls dequeue()
twice
- First
dequeue()
:s2.acquire()
→ s2: 2→1- Remove 10:
[15]
s1.release()
→ s1: 0→1- Returns: 10
- Second
dequeue()
:s2.acquire()
→ s2: 1→0- Remove 15:
[]
s1.release()
→ s1: 1→2- Returns: 15
- State: s1=2, s2=0, Queue=
[]
(EMPTY)
Step 7: Thread F calls dequeue()
(Queue is EMPTY)
s2.acquire()
→ s2 is 0, so Thread F BLOCKS waiting for an element- Thread F will remain blocked until another thread enqueues an element
Key Observations:
- The sum s1 + s2 always equals capacity (2 in this example)
- Producers block when s1=0 (no space), consumers block when s2=0 (no elements)
- The semaphores automatically handle the blocking and unblocking of threads
- FIFO order is maintained: elements are dequeued in the order they were enqueued
Solution Implementation
1from threading import Semaphore
2from collections import deque
3
4
5class BoundedBlockingQueue:
6 """
7 A thread-safe bounded blocking queue implementation using semaphores.
8
9 This queue blocks enqueue operations when full and dequeue operations when empty.
10 """
11
12 def __init__(self, capacity: int) -> None:
13 """
14 Initialize the bounded blocking queue with a given capacity.
15
16 Args:
17 capacity: Maximum number of elements the queue can hold
18 """
19 # Semaphore to track available space in queue (initially equals capacity)
20 self.empty_slots = Semaphore(capacity)
21
22 # Semaphore to track number of items in queue (initially 0)
23 self.filled_slots = Semaphore(0)
24
25 # The underlying queue data structure
26 self.queue = deque()
27
28 def enqueue(self, element: int) -> None:
29 """
30 Add an element to the queue. Blocks if the queue is full.
31
32 Args:
33 element: The integer element to add to the queue
34 """
35 # Wait for an empty slot (blocks if queue is full)
36 self.empty_slots.acquire()
37
38 # Add element to the queue
39 self.queue.append(element)
40
41 # Signal that a new item is available
42 self.filled_slots.release()
43
44 def dequeue(self) -> int:
45 """
46 Remove and return an element from the queue. Blocks if the queue is empty.
47
48 Returns:
49 The integer element removed from the front of the queue
50 """
51 # Wait for an item to be available (blocks if queue is empty)
52 self.filled_slots.acquire()
53
54 # Remove and store the front element
55 element = self.queue.popleft()
56
57 # Signal that a slot is now empty
58 self.empty_slots.release()
59
60 return element
61
62 def size(self) -> int:
63 """
64 Get the current number of elements in the queue.
65
66 Returns:
67 The current size of the queue
68 """
69 return len(self.queue)
70
1import java.util.concurrent.Semaphore;
2import java.util.Deque;
3import java.util.ArrayDeque;
4
5/**
6 * Thread-safe bounded blocking queue implementation using semaphores.
7 * Supports concurrent enqueue and dequeue operations with a fixed capacity.
8 */
9class BoundedBlockingQueue {
10 // Semaphore to track available slots for enqueue operations
11 private final Semaphore availableSlots;
12
13 // Semaphore to track available items for dequeue operations
14 private final Semaphore availableItems;
15
16 // Internal queue to store elements (not thread-safe by itself)
17 private final Deque<Integer> queue;
18
19 /**
20 * Initializes a bounded blocking queue with the specified capacity.
21 * @param capacity Maximum number of elements the queue can hold
22 */
23 public BoundedBlockingQueue(int capacity) {
24 // Initialize with capacity permits (all slots initially available)
25 availableSlots = new Semaphore(capacity);
26
27 // Initialize with 0 permits (no items initially available)
28 availableItems = new Semaphore(0);
29
30 // Create the underlying queue
31 queue = new ArrayDeque<>();
32 }
33
34 /**
35 * Adds an element to the queue, blocking if the queue is full.
36 * @param element The element to add to the queue
37 * @throws InterruptedException if the thread is interrupted while waiting
38 */
39 public void enqueue(int element) throws InterruptedException {
40 // Wait for an available slot (blocks if queue is full)
41 availableSlots.acquire();
42
43 // Add element to the queue
44 queue.offer(element);
45
46 // Signal that an item is now available for dequeue
47 availableItems.release();
48 }
49
50 /**
51 * Removes and returns an element from the queue, blocking if the queue is empty.
52 * @return The element removed from the queue
53 * @throws InterruptedException if the thread is interrupted while waiting
54 */
55 public int dequeue() throws InterruptedException {
56 // Wait for an available item (blocks if queue is empty)
57 availableItems.acquire();
58
59 // Remove and retrieve element from the queue
60 int element = queue.poll();
61
62 // Signal that a slot is now available for enqueue
63 availableSlots.release();
64
65 return element;
66 }
67
68 /**
69 * Returns the current number of elements in the queue.
70 * @return The size of the queue
71 */
72 public int size() {
73 return queue.size();
74 }
75}
76
1#include <queue>
2#include <semaphore.h>
3
4class BoundedBlockingQueue {
5public:
6 /**
7 * Constructor initializes a bounded blocking queue with given capacity.
8 * @param capacity Maximum number of elements the queue can hold
9 */
10 BoundedBlockingQueue(int capacity) {
11 // Initialize semaphore for available slots (producer semaphore)
12 // Initial value = capacity (all slots are initially available)
13 sem_init(&empty_slots_, 0, capacity);
14
15 // Initialize semaphore for filled slots (consumer semaphore)
16 // Initial value = 0 (no items to consume initially)
17 sem_init(&filled_slots_, 0, 0);
18 }
19
20 /**
21 * Adds an element to the queue. Blocks if queue is full.
22 * @param element The element to be added to the queue
23 */
24 void enqueue(int element) {
25 // Wait for an empty slot (decrements empty_slots_)
26 sem_wait(&empty_slots_);
27
28 // Critical section: add element to queue
29 queue_.push(element);
30
31 // Signal that a filled slot is available (increments filled_slots_)
32 sem_post(&filled_slots_);
33 }
34
35 /**
36 * Removes and returns an element from the queue. Blocks if queue is empty.
37 * @return The element removed from the front of the queue
38 */
39 int dequeue() {
40 // Wait for a filled slot (decrements filled_slots_)
41 sem_wait(&filled_slots_);
42
43 // Critical section: remove element from queue
44 int result = queue_.front();
45 queue_.pop();
46
47 // Signal that an empty slot is available (increments empty_slots_)
48 sem_post(&empty_slots_);
49
50 return result;
51 }
52
53 /**
54 * Returns the current number of elements in the queue.
55 * @return Current size of the queue
56 */
57 int size() {
58 return queue_.size();
59 }
60
61private:
62 std::queue<int> queue_; // Internal queue to store elements
63 sem_t empty_slots_; // Semaphore tracking available slots for producers
64 sem_t filled_slots_; // Semaphore tracking filled slots for consumers
65};
66
1import { Semaphore } from 'async-mutex';
2
3// Internal queue to store elements
4const queue: number[] = [];
5
6// Semaphore tracking available slots for producers
7let emptySlots: Semaphore;
8
9// Semaphore tracking filled slots for consumers
10let filledSlots: Semaphore;
11
12/**
13 * Constructor initializes a bounded blocking queue with given capacity.
14 * @param capacity - Maximum number of elements the queue can hold
15 */
16function BoundedBlockingQueue(capacity: number): void {
17 // Initialize semaphore for available slots (producer semaphore)
18 // Initial value = capacity (all slots are initially available)
19 emptySlots = new Semaphore(capacity);
20
21 // Initialize semaphore for filled slots (consumer semaphore)
22 // Initial value = 0 (no items to consume initially)
23 filledSlots = new Semaphore(0);
24}
25
26/**
27 * Adds an element to the queue. Blocks if queue is full.
28 * @param element - The element to be added to the queue
29 */
30async function enqueue(element: number): Promise<void> {
31 // Wait for an empty slot (decrements emptySlots)
32 await emptySlots.acquire();
33
34 // Critical section: add element to queue
35 queue.push(element);
36
37 // Signal that a filled slot is available (increments filledSlots)
38 filledSlots.release();
39}
40
41/**
42 * Removes and returns an element from the queue. Blocks if queue is empty.
43 * @returns The element removed from the front of the queue
44 */
45async function dequeue(): Promise<number> {
46 // Wait for a filled slot (decrements filledSlots)
47 await filledSlots.acquire();
48
49 // Critical section: remove element from queue
50 const result = queue.shift()!;
51
52 // Signal that an empty slot is available (increments emptySlots)
53 emptySlots.release();
54
55 return result;
56}
57
58/**
59 * Returns the current number of elements in the queue.
60 * @returns Current size of the queue
61 */
62function size(): number {
63 return queue.length;
64}
65
Time and Space Complexity
Time Complexity:
__init__(capacity)
:O(1)
- Initializing two semaphores and an empty deque takes constant time.enqueue(element)
:O(1)
- Acquiring a semaphore, appending to the deque, and releasing a semaphore are all constant time operations. Theappend()
operation on a deque isO(1)
.dequeue()
:O(1)
- Acquiring a semaphore, removing from the left of the deque usingpopleft()
, and releasing a semaphore are all constant time operations. Thepopleft()
operation on a deque isO(1)
.size()
:O(1)
- Getting the length of a deque is a constant time operation as Python's deque maintains its size internally.
Space Complexity:
- Overall space complexity:
O(n)
wheren
is the capacity of the queue. - The space is used by:
- The deque
self.q
which can hold up tocapacity
elements:O(capacity)
- Two semaphores
self.s1
andself.s2
:O(1)
each - Since the maximum number of elements that can be stored is bounded by the capacity parameter, the space complexity is
O(capacity)
.
- The deque
Learn more about how to find time and space complexity quickly.
Common Pitfalls
1. Incorrect Semaphore Order Leading to Deadlock
A critical mistake is reversing the order of semaphore operations, which can cause deadlock:
Incorrect Implementation:
def enqueue(self, element: int) -> None:
self.queue.append(element) # Modifying queue BEFORE acquiring semaphore
self.empty_slots.acquire() # Wrong order!
self.filled_slots.release()
Why it fails: Multiple threads could append to the queue simultaneously before any semaphore is acquired, breaking thread-safety and capacity constraints.
Correct approach: Always follow the pattern: acquire → modify → release
2. Using a Regular List Instead of Deque
Problematic Implementation:
def __init__(self, capacity: int):
self.queue = [] # Using list instead of deque
def dequeue(self) -> int:
self.filled_slots.acquire()
element = self.queue.pop(0) # O(n) operation!
self.empty_slots.release()
return element
Why it fails: list.pop(0)
is O(n) because it shifts all remaining elements. Under high concurrency, this becomes a bottleneck.
Solution: Use collections.deque
with popleft()
for O(1) operations.
3. Attempting to Make size() "More Accurate" with Synchronization
Over-engineered Implementation:
def size(self) -> int:
# Unnecessary synchronization attempt
self.empty_slots.acquire()
self.filled_slots.acquire()
size = len(self.queue)
self.filled_slots.release()
self.empty_slots.release()
return size
Why it fails:
- This can cause deadlock if the queue is empty (filled_slots = 0)
- The size is inherently a "snapshot" in concurrent systems - it may change immediately after reading
- Python's
len()
on deque is already thread-safe for reading
Solution: Keep it simple - just return len(self.queue)
4. Using Lock Instead of Semaphores
Alternative but Less Elegant Implementation:
def __init__(self, capacity: int):
self.lock = Lock()
self.not_full = Condition(self.lock)
self.not_empty = Condition(self.lock)
self.queue = deque()
self.capacity = capacity
def enqueue(self, element: int):
with self.lock:
while len(self.queue) >= self.capacity:
self.not_full.wait() # More complex logic
self.queue.append(element)
self.not_empty.notify()
Why semaphores are better here:
- Semaphores naturally track counts (available space/elements)
- Cleaner code without explicit while loops and condition checks
- Semaphores handle the "permit" counting automatically
5. Not Handling Thread Interruption
In production code, you might want to handle interruption:
def enqueue(self, element: int, timeout: float = None) -> bool:
if not self.empty_slots.acquire(timeout=timeout):
return False # Timed out
try:
self.queue.append(element)
self.filled_slots.release()
return True
except:
self.empty_slots.release() # Restore semaphore state on error
raise
Best Practice: Consider adding timeout parameters and exception handling for production use, though the basic implementation is correct for the problem requirements.
Which of the following is a good use case for backtracking?
Recommended Readings
Coding Interview Patterns Your Personal Dijkstra's Algorithm to Landing Your Dream Job The goal of AlgoMonster is to help you get a job in the shortest amount of time possible in a data driven way We compiled datasets of tech interview problems and broke them down by patterns This way
Recursion Recursion is one of the most important concepts in computer science Simply speaking recursion is the process of a function calling itself Using a real life analogy imagine a scenario where you invite your friends to lunch https assets algo monster recursion jpg You first call Ben and ask
Runtime Overview When learning about algorithms and data structures you'll frequently encounter the term time complexity This concept is fundamental in computer science and offers insights into how long an algorithm takes to complete given a certain input size What is Time Complexity Time complexity represents the amount of time
Want a Structured Path to Master System Design Too? Don’t Miss This!