1188. Design Bounded Blocking Queue
Problem Description
The problem is about creating a data structure which is a bounded blocking queue with thread-safety in mind. A bounded blocking queue is a queue with a fixed maximum size, and it has the capability to block or wait when operations like enqueue (adding to the queue) and dequeue (removing from the queue) cannot be performed because the queue is full or empty, respectively. The queue supports three main operations:
enqueue(int element)
: This method adds an element to the end of the queue. If the queue has reached its capacity, the method should block the calling thread until there is space available to add the new element.dequeue()
: This removes and returns the element at the front of the queue. If the queue is empty, this operation should block until there is an element available to dequeue.size()
: This returns the current number of elements in the queue.
It is especially noted that the implementation will be tested in a multithreaded environment, where multiple threads could be calling these methods simultaneously. Therefore, it is crucial that the implementation ensures that all operations on the bounded blocking queue are thread-safe (i.e., function correctly when accessed from multiple threads).
Lastly, the use of any built-in bounded blocking queue implementations is prohibited as the goal is to understand how to create such a data structure from scratch, potentially in a job interview.
Intuition
The solution to the problem involves coordinating access to the queue to ensure that only one thread can perform an enqueue or dequeue operation at a time to maintain thread safety. This means protecting the internal state of the queue from race conditions that could lead to incorrect behavior or data corruption.
To achieve this, we employ two synchronization primitives called Semaphores
. A semaphore maintains a set of permits, and a thread that wants to perform an operation must first acquire a permit. If no permit is available, the thread blocks until a permit is released by another thread. This behavior fits perfectly for our problem's requirements.
We use two semaphores in the solution:
-
s1
, which starts with a number of permits equal to the capacity of the queue. This semaphore controls access to enqueue operation. A thread can enqueue if it can acquire a permit froms1
, which signifies that there is room in the queue. After the enqueue operation, the thread releases a permit tos2
, signaling that there is an item available to be dequeued. -
s2
, which starts with zero permits as the queue is initially empty. This semaphore controls access to the dequeue operation. For a thread to dequeue, it must acquire a permit froms2
, which signifies that there is at least one item in the queue to be dequeued. After the dequeue operation, the thread releases a permit tos1
, indicating that there is now additional space available in the queue.
The queue itself (q
) is represented by a deque
(double-ended queue) from Python's collections module, which allows for fast appending and popping from both ends. Note that accessing len(q)
to get the size of the queue does not need to be serialized by semaphores, as it is not modifying the queue.
This approach enables us to limit the number of elements in the queue to the defined capacity, and to ensure that enqueue
and dequeue
operations wait for the queue to be not full or not empty, respectively, before proceeding, fulfilling the conditions for a bounded blocking queue in a thread-safe manner.
Solution Approach
In the provided solution, the implementation of the BoundedBlockingQueue
class is done with two semaphores and a deque. Here's a walkthrough of the pattern and algorithms used:
-
Semaphores: Two instances of the Semaphore class are used,
s1
ands2
, each serving a distinct purpose.s1
governs the ability to insert an item into the queue, and begins with permits equal to the capacity.s2
reflects the number of items in the queue available to be dequeued and starts with no permits. This is a classic application of the "producer-consumer" problem's solution, where one semaphore is used to signal "empty slots" and another semaphore is used to signal "available items". -
Deque: A deque (double-ended queue) from the collections module is used to represent the queue's data structure. This provides efficient FIFO (first-in-first-out) operations needed for enqueueing (
append
) and dequeueing (popleft
).
When enqueue(element: int)
is called, the following steps are performed:
self.s1.acquire()
: Attempt to acquire a permit froms1
, which represents a free slot in the queue. If there are no free slots, this call will block until another thread callsdequeue
and releases a permit ons1
.self.q.append(element)
: Once a permit has been acquired (meaning there is space in the queue), the element is safely enqueued into the queue.self.s2.release()
: Releasing a permit ons2
to signal that an element has been enqueued and is now available for dequeuing.
For dequeue()
, the steps are the mirror image:
self.s2.acquire()
: This acquires a permit froms2
, which signifies that there is at least one element in the queue to be dequeued. If the queue is empty, this call will block until a thread callsenqueue
and releases a permit ons2
.ans = self.q.popleft()
: Removes the oldest (front) element from the queue safely because it's been ensured that the queue is not empty.self.s1.release()
: Releasing a permit ons1
to signal that an element has been dequeued and there is now a free slot in the queue.
size()
is a straightforward operation as it simply returns the current number of items in the queue, len(self.q)
. It does not modify the queue, so it does not require interaction with semaphores.
The solution effectively serializes access to the mutable shared state (self.q
), preventing race conditions by using semaphores to coordinate enqueue and dequeue actions. This guarantees that the queue never exceeds its capacity and avoids dequeue operations being called on an empty queue, thus satisfying thread safety and other requirements of the problem.
Ready to land your dream job?
Unlock your dream job with a 2-minute evaluator for a personalized learning plan!
Start EvaluatorExample Walkthrough
Let's consider a small example to illustrate the solution approach for a BoundedBlockingQueue
with a capacity of 2.
-
Initially, we create the queue with a capacity of 2, initializing semaphore
s1
with 2 permits and semaphores2
with 0 permits. -
Imagine thread A calls
enqueue(1)
:- It acquires a permit from
s1
, which now has 1 permit left. - It then appends
1
to thedeque
, and the queue state becomes[1]
. - Finally, it releases a permit to
s2
, indicating that there is one item available for dequeuing.
- It acquires a permit from
-
Now, thread B calls
enqueue(2)
:- It acquires the remaining permit from
s1
, ands1
now has 0 permits. - It appends
2
to thedeque
, so the queue state becomes[1,2]
. - It releases another permit to
s2
, nows2
has 2 permits indicating there are two items available to be dequeued.
- It acquires the remaining permit from
-
At this state, the queue is full. If another thread, say thread C, tries to
enqueue(3)
, it will be blocked ass1
has no permits left, signifying the queue is at full capacity. -
Meanwhile, if thread D calls
dequeue()
:- It acquires a permit from
s2
(which has 2 permits at this point), leaving 1 permit left ins2
. - It dequeues an element from the
deque
which is1
(FIFO order), leaving the queue state as[2]
. - It releases a permit to
s1
, increasing the number of permits back to 1, signaling that there is now space for one more item in the queue.
- It acquires a permit from
-
If thread C is still waiting to
enqueue(3)
, it can now proceed as a permit became available ins1
.- It acquires the permit from
s1
, and agains1
has 0 permits. - It appends
3
to thedeque
, so the queue state becomes[2,3]
. - It releases a permit to
s2
, which now has 2 permits, reflecting the two items in the queue.
- It acquires the permit from
-
At any time, calling
size()
returns the number of items currently in the queue, which can be accessed by any thread without needing to acquire a permit.
This example demonstrates how the BoundedBlockingQueue
enforces its bounds and provides thread-safe enqueueing and dequeuing operations using semaphores to manage its capacity and state.
Solution Implementation
1from threading import Semaphore
2from collections import deque # Ensure deque is imported
3
4class BoundedBlockingQueue:
5 def __init__(self, capacity: int):
6 # Initialize the queue with given capacity.
7 self.semaphore_empty_slots = Semaphore(capacity) # Semaphore to track empty slots
8 self.semaphore_filled_slots = Semaphore(0) # Semaphore to track filled slots
9 self.queue = deque() # Use deque for queue operations
10
11 def enqueue(self, element: int) -> None:
12 # Add an element to the end of the queue.
13 self.semaphore_empty_slots.acquire() # Decrease the counter of empty slots, wait if no empty slots
14 self.queue.append(element) # Add the element to the queue
15 self.semaphore_filled_slots.release() # Increase the counter of filled slots, signaling dequeue if slots are filled
16
17 def dequeue(self) -> int:
18 # Remove and return an element from the front of the queue.
19 self.semaphore_filled_slots.acquire() # Decrease the counter of filled slots, wait if no filled slots
20 element = self.queue.popleft() # Remove the element from the queue
21 self.semaphore_empty_slots.release() # Increase the counter of empty slots, signaling enqueue if slots are available
22 return element # Return the dequeued element
23
24 def size(self) -> int:
25 # Get the current number of elements in the queue.
26 return len(self.queue) # Return the size of the queue
27
1import java.util.ArrayDeque;
2import java.util.Deque;
3import java.util.concurrent.Semaphore;
4
5// This class represents a thread-safe bounded blocking queue with a fixed capacity.
6public class BoundedBlockingQueue {
7 // Semaphore to control the number of elements that can be added (based on capacity).
8 private final Semaphore enqueueSemaphore;
9 // Semaphore to control the number of elements that can be removed (starts at 0).
10 private final Semaphore dequeueSemaphore;
11 // The queue to store elements.
12 private final Deque<Integer> queue;
13
14 // Constructor initializes the semaphores and queue with specified capacity.
15 public BoundedBlockingQueue(int capacity) {
16 enqueueSemaphore = new Semaphore(capacity);
17 dequeueSemaphore = new Semaphore(0);
18 queue = new ArrayDeque<>();
19 }
20
21 // Enqueues an element into the queue if there's available capacity.
22 public void enqueue(int element) throws InterruptedException {
23 // Acquire a permit from enqueueSemaphore discarding it, if available capacity is 0 waits.
24 enqueueSemaphore.acquire();
25 synchronized (this) {
26 // Adds the element to the end of the queue.
27 queue.offer(element);
28 }
29 // Release a permit to dequeueSemaphore, increasing the number of available elements to dequeue.
30 dequeueSemaphore.release();
31 }
32
33 // Dequeues an element from the front of the queue.
34 public int dequeue() throws InterruptedException {
35 // Acquire a permit from dequeueSemaphore, waiting if necessary until an element is available.
36 dequeueSemaphore.acquire();
37 int element;
38 synchronized (this) {
39 // Remove and return the front element of the queue.
40 element = queue.poll();
41 }
42 // Release a permit to enqueueSemaphore, increasing the available capacity.
43 enqueueSemaphore.release();
44 return element;
45 }
46
47 // Returns the current number of elements in the queue.
48 public int size() {
49 synchronized (this) {
50 // The size of the queue is returned.
51 return queue.size();
52 }
53 }
54}
55
1#include <queue>
2#include <mutex>
3#include <condition_variable>
4
5class BoundedBlockingQueue {
6public:
7 // Constructor initializes the queue with a capacity limit.
8 BoundedBlockingQueue(int capacity) : capacity_(capacity), count_(0) {
9 // No need to initialize semaphores since we will use condition_variable and mutex
10 }
11
12 // Enqueue adds an element to the queue. If the queue is full, blocks until space is available.
13 void enqueue(int element) {
14 std::unique_lock<std::mutex> lock(mutex_);
15 // Wait until there is space in the queue
16 not_full_condition_.wait(lock, [this] { return count_ < capacity_; });
17 queue_.push(element);
18 ++count_;
19 // Notify one waiting thread (if any) that an item was dequeued
20 not_empty_condition_.notify_one();
21 }
22
23 // Dequeue removes and returns an element from the queue. If the queue is empty, blocks until an element is available.
24 int dequeue() {
25 std::unique_lock<std::mutex> lock(mutex_);
26 // Wait until there is an item to dequeue
27 not_empty_condition_.wait(lock, [this] { return count_ > 0; });
28 int value = queue_.front();
29 queue_.pop();
30 --count_;
31 // Notify one waiting thread (if any) that space is now available
32 not_full_condition_.notify_one();
33 return value;
34 }
35
36 // Get the current size of the queue.
37 int size() {
38 std::lock_guard<std::mutex> lock(mutex_);
39 return count_;
40 }
41
42private:
43 std::queue<int> queue_; // The queue that holds the elements
44 std::mutex mutex_; // Mutex to protect access to the queue
45 std::condition_variable not_full_condition_; // Condition variable to block enqueue when queue is full
46 std::condition_variable not_empty_condition_; // Condition variable to block dequeue when queue is empty
47 int capacity_; // Maximum number of items in the queue
48 int count_; // Current number of items in the queue
49};
50
1const queue: number[] = []; // The queue that holds the elements
2let capacity: number; // Maximum number of items in the queue
3let count: number = 0; // Current number of items in the queue
4
5// Utilizing these for synchronization could be complex without encapsulation
6const mutex = new Mutex(); // A pretend Mutex since JavaScript/TypeScript doesn't have one
7const notFullCondition = new ConditionVariable(); // A pretend condition variable for when the queue is not full
8const notEmptyCondition = new ConditionVariable(); // A pretend condition variable for when the queue is not empty
9
10// Initialization of the queue with a capacity limit.
11function initializeQueue(initialCapacity: number): void {
12 capacity = initialCapacity;
13}
14
15// Adds an element to the queue. If the queue is full, it is supposed to block until space is available.
16async function enqueue(element: number): Promise<void> {
17 await mutex.lock();
18 try {
19 // Wait until there is space in the queue
20 while (count >= capacity) {
21 await notFullCondition.wait(mutex);
22 }
23 queue.push(element);
24 count++;
25 // Notify one waiting thread (if any) that an item was enqueued
26 notEmptyCondition.notifyOne();
27 } finally {
28 mutex.unlock();
29 }
30}
31
32// Removes and returns an element from the queue. If the queue is empty, it is supposed to block until an element is available.
33async function dequeue(): Promise<number> {
34 await mutex.lock();
35 try {
36 // Wait until there is an item to dequeue
37 while (count === 0) {
38 await notEmptyCondition.wait(mutex);
39 }
40 const value = queue.shift();
41 count--;
42 // Notify one waiting thread (if any) that space is now available
43 notFullCondition.notifyOne();
44 return value;
45 } finally {
46 mutex.unlock();
47 }
48}
49
50// Returns the current size of the queue.
51function size(): number {
52 return count; // No need for synchronization in a single-threaded environment
53}
54
55// Note that Mutex and ConditionVariable are not native JS/TS classes.
56// These would need to be implemented or a library would have to be used to simulate them.
57
Time and Space Complexity
For this BoundedBlockingQueue
implementation using semaphores, we will analyze the time and space complexities of its operations.
Time Complexity
-
__init__
: Initializing the queue involves setting up two semaphores and the underlying deque. This operation is constant time,O(1)
, as it involves only a fixed number of operations, regardless of the capacity of the queue. -
enqueue
: The enqueue operation involves two semaphore operations (acquire and release) and an append operation on a deque. The semaphore operations are generallyO(1)
assuming they don't block; if they do block, the time complexity is dependent on external factors such as contention from other threads. Appending to the deque is anO(1)
operation. Therefore, the combined time complexity isO(1)
per call in the absence of blocking. -
dequeue
: Like enqueue, dequeue also has two semaphore operations and a popleft operation on the deque. Since deque's popleft is designed to beO(1)
and semaphore operations areO(1)
without blocking, the overall time complexity for dequeue is againO(1)
per call in the absence of blocking. -
size
: This simply returns the number of items in the deque, which is maintained internally and is thus anO(1)
operation.
Space Complexity
- The space complexity revolves around the deque that stores elements. Since the capacity of the queue is fixed, the maximum space it will use is
O(capacity)
, corresponding to the maximum number of elements that can be enqueued at any given time.
In summary, except for the potential blocking on semaphores (which can't be quantified in standard complexity analysis), all fundamental operations (__init__
, enqueue
, dequeue
, size
) on the BoundedBlockingQueue
class have a time complexity of O(1)
. The space complexity is O(capacity)
based on the fixed maximum size of the underlying deque.
Learn more about how to find time and space complexity quickly using problem constraints.
What's the output of running the following function using the following tree as input?
1def serialize(root):
2 res = []
3 def dfs(root):
4 if not root:
5 res.append('x')
6 return
7 res.append(root.val)
8 dfs(root.left)
9 dfs(root.right)
10 dfs(root)
11 return ' '.join(res)
12
1import java.util.StringJoiner;
2
3public static String serialize(Node root) {
4 StringJoiner res = new StringJoiner(" ");
5 serializeDFS(root, res);
6 return res.toString();
7}
8
9private static void serializeDFS(Node root, StringJoiner result) {
10 if (root == null) {
11 result.add("x");
12 return;
13 }
14 result.add(Integer.toString(root.val));
15 serializeDFS(root.left, result);
16 serializeDFS(root.right, result);
17}
18
1function serialize(root) {
2 let res = [];
3 serialize_dfs(root, res);
4 return res.join(" ");
5}
6
7function serialize_dfs(root, res) {
8 if (!root) {
9 res.push("x");
10 return;
11 }
12 res.push(root.val);
13 serialize_dfs(root.left, res);
14 serialize_dfs(root.right, res);
15}
16
Recommended Readings
LeetCode Patterns Your Personal Dijkstra's Algorithm to Landing Your Dream Job The goal of AlgoMonster is to help you get a job in the shortest amount of time possible in a data driven way We compiled datasets of tech interview problems and broke them down by patterns This way we
Recursion Recursion is one of the most important concepts in computer science Simply speaking recursion is the process of a function calling itself Using a real life analogy imagine a scenario where you invite your friends to lunch https algomonster s3 us east 2 amazonaws com recursion jpg You first
Runtime Overview When learning about algorithms and data structures you'll frequently encounter the term time complexity This concept is fundamental in computer science and offers insights into how long an algorithm takes to complete given a certain input size What is Time Complexity Time complexity represents the amount of time
Want a Structured Path to Master System Design Too? Don’t Miss This!