1188. Design Bounded Blocking Queue

MediumConcurrency
Leetcode Link

Problem Description

The problem is about creating a data structure which is a bounded blocking queue with thread-safety in mind. A bounded blocking queue is a queue with a fixed maximum size, and it has the capability to block or wait when operations like enqueue (adding to the queue) and dequeue (removing from the queue) cannot be performed because the queue is full or empty, respectively. The queue supports three main operations:

  • enqueue(int element): This method adds an element to the end of the queue. If the queue has reached its capacity, the method should block the calling thread until there is space available to add the new element.
  • dequeue(): This removes and returns the element at the front of the queue. If the queue is empty, this operation should block until there is an element available to dequeue.
  • size(): This returns the current number of elements in the queue.

It is especially noted that the implementation will be tested in a multithreaded environment, where multiple threads could be calling these methods simultaneously. Therefore, it is crucial that the implementation ensures that all operations on the bounded blocking queue are thread-safe (i.e., function correctly when accessed from multiple threads).

Lastly, the use of any built-in bounded blocking queue implementations is prohibited as the goal is to understand how to create such a data structure from scratch, potentially in a job interview.

Intuition

The solution to the problem involves coordinating access to the queue to ensure that only one thread can perform an enqueue or dequeue operation at a time to maintain thread safety. This means protecting the internal state of the queue from race conditions that could lead to incorrect behavior or data corruption.

To achieve this, we employ two synchronization primitives called Semaphores. A semaphore maintains a set of permits, and a thread that wants to perform an operation must first acquire a permit. If no permit is available, the thread blocks until a permit is released by another thread. This behavior fits perfectly for our problem's requirements.

We use two semaphores in the solution:

  1. s1, which starts with a number of permits equal to the capacity of the queue. This semaphore controls access to enqueue operation. A thread can enqueue if it can acquire a permit from s1, which signifies that there is room in the queue. After the enqueue operation, the thread releases a permit to s2, signaling that there is an item available to be dequeued.

  2. s2, which starts with zero permits as the queue is initially empty. This semaphore controls access to the dequeue operation. For a thread to dequeue, it must acquire a permit from s2, which signifies that there is at least one item in the queue to be dequeued. After the dequeue operation, the thread releases a permit to s1, indicating that there is now additional space available in the queue.

The queue itself (q) is represented by a deque (double-ended queue) from Python's collections module, which allows for fast appending and popping from both ends. Note that accessing len(q) to get the size of the queue does not need to be serialized by semaphores, as it is not modifying the queue.

This approach enables us to limit the number of elements in the queue to the defined capacity, and to ensure that enqueue and dequeue operations wait for the queue to be not full or not empty, respectively, before proceeding, fulfilling the conditions for a bounded blocking queue in a thread-safe manner.

Not Sure What to Study? Take the 2-min Quiz to Find Your Missing Piece:

What's the output of running the following function using input 56?

1KEYBOARD = {
2    '2': 'abc',
3    '3': 'def',
4    '4': 'ghi',
5    '5': 'jkl',
6    '6': 'mno',
7    '7': 'pqrs',
8    '8': 'tuv',
9    '9': 'wxyz',
10}
11
12def letter_combinations_of_phone_number(digits):
13    def dfs(path, res):
14        if len(path) == len(digits):
15            res.append(''.join(path))
16            return
17
18        next_number = digits[len(path)]
19        for letter in KEYBOARD[next_number]:
20            path.append(letter)
21            dfs(path, res)
22            path.pop()
23
24    res = []
25    dfs([], res)
26    return res
27
1private static final Map<Character, char[]> KEYBOARD = Map.of(
2    '2', "abc".toCharArray(),
3    '3', "def".toCharArray(),
4    '4', "ghi".toCharArray(),
5    '5', "jkl".toCharArray(),
6    '6', "mno".toCharArray(),
7    '7', "pqrs".toCharArray(),
8    '8', "tuv".toCharArray(),
9    '9', "wxyz".toCharArray()
10);
11
12public static List<String> letterCombinationsOfPhoneNumber(String digits) {
13    List<String> res = new ArrayList<>();
14    dfs(new StringBuilder(), res, digits.toCharArray());
15    return res;
16}
17
18private static void dfs(StringBuilder path, List<String> res, char[] digits) {
19    if (path.length() == digits.length) {
20        res.add(path.toString());
21        return;
22    }
23    char next_digit = digits[path.length()];
24    for (char letter : KEYBOARD.get(next_digit)) {
25        path.append(letter);
26        dfs(path, res, digits);
27        path.deleteCharAt(path.length() - 1);
28    }
29}
30
1const KEYBOARD = {
2    '2': 'abc',
3    '3': 'def',
4    '4': 'ghi',
5    '5': 'jkl',
6    '6': 'mno',
7    '7': 'pqrs',
8    '8': 'tuv',
9    '9': 'wxyz',
10}
11
12function letter_combinations_of_phone_number(digits) {
13    let res = [];
14    dfs(digits, [], res);
15    return res;
16}
17
18function dfs(digits, path, res) {
19    if (path.length === digits.length) {
20        res.push(path.join(''));
21        return;
22    }
23    let next_number = digits.charAt(path.length);
24    for (let letter of KEYBOARD[next_number]) {
25        path.push(letter);
26        dfs(digits, path, res);
27        path.pop();
28    }
29}
30

Solution Approach

In the provided solution, the implementation of the BoundedBlockingQueue class is done with two semaphores and a deque. Here's a walkthrough of the pattern and algorithms used:

  • Semaphores: Two instances of the Semaphore class are used, s1 and s2, each serving a distinct purpose. s1 governs the ability to insert an item into the queue, and begins with permits equal to the capacity. s2 reflects the number of items in the queue available to be dequeued and starts with no permits. This is a classic application of the "producer-consumer" problem's solution, where one semaphore is used to signal "empty slots" and another semaphore is used to signal "available items".

  • Deque: A deque (double-ended queue) from the collections module is used to represent the queue's data structure. This provides efficient FIFO (first-in-first-out) operations needed for enqueueing (append) and dequeueing (popleft).

When enqueue(element: int) is called, the following steps are performed:

  1. self.s1.acquire(): Attempt to acquire a permit from s1, which represents a free slot in the queue. If there are no free slots, this call will block until another thread calls dequeue and releases a permit on s1.
  2. self.q.append(element): Once a permit has been acquired (meaning there is space in the queue), the element is safely enqueued into the queue.
  3. self.s2.release(): Releasing a permit on s2 to signal that an element has been enqueued and is now available for dequeuing.

For dequeue(), the steps are the mirror image:

  1. self.s2.acquire(): This acquires a permit from s2, which signifies that there is at least one element in the queue to be dequeued. If the queue is empty, this call will block until a thread calls enqueue and releases a permit on s2.
  2. ans = self.q.popleft(): Removes the oldest (front) element from the queue safely because it's been ensured that the queue is not empty.
  3. self.s1.release(): Releasing a permit on s1 to signal that an element has been dequeued and there is now a free slot in the queue.

size() is a straightforward operation as it simply returns the current number of items in the queue, len(self.q). It does not modify the queue, so it does not require interaction with semaphores.

The solution effectively serializes access to the mutable shared state (self.q), preventing race conditions by using semaphores to coordinate enqueue and dequeue actions. This guarantees that the queue never exceeds its capacity and avoids dequeue operations being called on an empty queue, thus satisfying thread safety and other requirements of the problem.

Discover Your Strengths and Weaknesses: Take Our 2-Minute Quiz to Tailor Your Study Plan:

A heap is a ...?

Example Walkthrough

Let's consider a small example to illustrate the solution approach for a BoundedBlockingQueue with a capacity of 2.

  • Initially, we create the queue with a capacity of 2, initializing semaphore s1 with 2 permits and semaphore s2 with 0 permits.

  • Imagine thread A calls enqueue(1):

    1. It acquires a permit from s1, which now has 1 permit left.
    2. It then appends 1 to the deque, and the queue state becomes [1].
    3. Finally, it releases a permit to s2, indicating that there is one item available for dequeuing.
  • Now, thread B calls enqueue(2):

    1. It acquires the remaining permit from s1, and s1 now has 0 permits.
    2. It appends 2 to the deque, so the queue state becomes [1,2].
    3. It releases another permit to s2, now s2 has 2 permits indicating there are two items available to be dequeued.
  • At this state, the queue is full. If another thread, say thread C, tries to enqueue(3), it will be blocked as s1 has no permits left, signifying the queue is at full capacity.

  • Meanwhile, if thread D calls dequeue():

    1. It acquires a permit from s2 (which has 2 permits at this point), leaving 1 permit left in s2.
    2. It dequeues an element from the deque which is 1 (FIFO order), leaving the queue state as [2].
    3. It releases a permit to s1, increasing the number of permits back to 1, signaling that there is now space for one more item in the queue.
  • If thread C is still waiting to enqueue(3), it can now proceed as a permit became available in s1.

    1. It acquires the permit from s1, and again s1 has 0 permits.
    2. It appends 3 to the deque, so the queue state becomes [2,3].
    3. It releases a permit to s2, which now has 2 permits, reflecting the two items in the queue.
  • At any time, calling size() returns the number of items currently in the queue, which can be accessed by any thread without needing to acquire a permit.

This example demonstrates how the BoundedBlockingQueue enforces its bounds and provides thread-safe enqueueing and dequeuing operations using semaphores to manage its capacity and state.

Solution Implementation

1from threading import Semaphore
2from collections import deque  # Ensure deque is imported
3
4class BoundedBlockingQueue:
5    def __init__(self, capacity: int):
6        # Initialize the queue with given capacity.
7        self.semaphore_empty_slots = Semaphore(capacity)  # Semaphore to track empty slots
8        self.semaphore_filled_slots = Semaphore(0)        # Semaphore to track filled slots
9        self.queue = deque()                               # Use deque for queue operations
10
11    def enqueue(self, element: int) -> None:
12        # Add an element to the end of the queue.
13        self.semaphore_empty_slots.acquire()  # Decrease the counter of empty slots, wait if no empty slots
14        self.queue.append(element)            # Add the element to the queue
15        self.semaphore_filled_slots.release() # Increase the counter of filled slots, signaling dequeue if slots are filled
16
17    def dequeue(self) -> int:
18        # Remove and return an element from the front of the queue.
19        self.semaphore_filled_slots.acquire()  # Decrease the counter of filled slots, wait if no filled slots
20        element = self.queue.popleft()         # Remove the element from the queue
21        self.semaphore_empty_slots.release()   # Increase the counter of empty slots, signaling enqueue if slots are available
22        return element                         # Return the dequeued element
23
24    def size(self) -> int:
25        # Get the current number of elements in the queue.
26        return len(self.queue)             # Return the size of the queue
27
1import java.util.ArrayDeque;
2import java.util.Deque;
3import java.util.concurrent.Semaphore;
4
5// This class represents a thread-safe bounded blocking queue with a fixed capacity.
6public class BoundedBlockingQueue {
7    // Semaphore to control the number of elements that can be added (based on capacity).
8    private final Semaphore enqueueSemaphore;
9    // Semaphore to control the number of elements that can be removed (starts at 0).
10    private final Semaphore dequeueSemaphore;
11    // The queue to store elements.
12    private final Deque<Integer> queue;
13
14    // Constructor initializes the semaphores and queue with specified capacity.
15    public BoundedBlockingQueue(int capacity) {
16        enqueueSemaphore = new Semaphore(capacity);
17        dequeueSemaphore = new Semaphore(0);
18        queue = new ArrayDeque<>();
19    }
20
21    // Enqueues an element into the queue if there's available capacity.
22    public void enqueue(int element) throws InterruptedException {
23        // Acquire a permit from enqueueSemaphore discarding it, if available capacity is 0 waits.
24        enqueueSemaphore.acquire();
25        synchronized (this) {
26            // Adds the element to the end of the queue.
27            queue.offer(element);
28        }
29        // Release a permit to dequeueSemaphore, increasing the number of available elements to dequeue.
30        dequeueSemaphore.release();
31    }
32
33    // Dequeues an element from the front of the queue.
34    public int dequeue() throws InterruptedException {
35        // Acquire a permit from dequeueSemaphore, waiting if necessary until an element is available.
36        dequeueSemaphore.acquire();
37        int element;
38        synchronized (this) {
39            // Remove and return the front element of the queue.
40            element = queue.poll();
41        }
42        // Release a permit to enqueueSemaphore, increasing the available capacity.
43        enqueueSemaphore.release();
44        return element;
45    }
46
47    // Returns the current number of elements in the queue.
48    public int size() {
49        synchronized (this) {
50            // The size of the queue is returned.
51            return queue.size();
52        }
53    }
54}
55
1#include <queue>
2#include <mutex>
3#include <condition_variable>
4
5class BoundedBlockingQueue {
6public:
7    // Constructor initializes the queue with a capacity limit.
8    BoundedBlockingQueue(int capacity) : capacity_(capacity), count_(0) {
9        // No need to initialize semaphores since we will use condition_variable and mutex
10    }
11
12    // Enqueue adds an element to the queue. If the queue is full, blocks until space is available.
13    void enqueue(int element) {
14        std::unique_lock<std::mutex> lock(mutex_);
15        // Wait until there is space in the queue
16        not_full_condition_.wait(lock, [this] { return count_ < capacity_; });
17        queue_.push(element);
18        ++count_;
19        // Notify one waiting thread (if any) that an item was dequeued
20        not_empty_condition_.notify_one();
21    }
22
23    // Dequeue removes and returns an element from the queue. If the queue is empty, blocks until an element is available.
24    int dequeue() {
25        std::unique_lock<std::mutex> lock(mutex_);
26        // Wait until there is an item to dequeue
27        not_empty_condition_.wait(lock, [this] { return count_ > 0; });
28        int value = queue_.front();
29        queue_.pop();
30        --count_;
31        // Notify one waiting thread (if any) that space is now available
32        not_full_condition_.notify_one();
33        return value;
34    }
35
36    // Get the current size of the queue.
37    int size() {
38        std::lock_guard<std::mutex> lock(mutex_);
39        return count_;
40    }
41
42private:
43    std::queue<int> queue_; // The queue that holds the elements
44    std::mutex mutex_; // Mutex to protect access to the queue
45    std::condition_variable not_full_condition_; // Condition variable to block enqueue when queue is full
46    std::condition_variable not_empty_condition_; // Condition variable to block dequeue when queue is empty
47    int capacity_; // Maximum number of items in the queue
48    int count_; // Current number of items in the queue
49};
50
1const queue: number[] = []; // The queue that holds the elements
2let capacity: number; // Maximum number of items in the queue
3let count: number = 0; // Current number of items in the queue
4
5// Utilizing these for synchronization could be complex without encapsulation
6const mutex = new Mutex(); // A pretend Mutex since JavaScript/TypeScript doesn't have one
7const notFullCondition = new ConditionVariable(); // A pretend condition variable for when the queue is not full
8const notEmptyCondition = new ConditionVariable(); // A pretend condition variable for when the queue is not empty
9
10// Initialization of the queue with a capacity limit.
11function initializeQueue(initialCapacity: number): void {
12    capacity = initialCapacity;
13}
14
15// Adds an element to the queue. If the queue is full, it is supposed to block until space is available.
16async function enqueue(element: number): Promise<void> {
17    await mutex.lock();
18    try {
19        // Wait until there is space in the queue
20        while (count >= capacity) {
21            await notFullCondition.wait(mutex);
22        }
23        queue.push(element);
24        count++;
25        // Notify one waiting thread (if any) that an item was enqueued
26        notEmptyCondition.notifyOne();
27    } finally {
28        mutex.unlock();
29    }
30}
31
32// Removes and returns an element from the queue. If the queue is empty, it is supposed to block until an element is available.
33async function dequeue(): Promise<number> {
34    await mutex.lock();
35    try {
36        // Wait until there is an item to dequeue
37        while (count === 0) {
38            await notEmptyCondition.wait(mutex);
39        }
40        const value = queue.shift();
41        count--;
42        // Notify one waiting thread (if any) that space is now available
43        notFullCondition.notifyOne();
44        return value;
45    } finally {
46        mutex.unlock();
47    }
48}
49
50// Returns the current size of the queue.
51function size(): number {
52    return count; // No need for synchronization in a single-threaded environment
53}
54
55// Note that Mutex and ConditionVariable are not native JS/TS classes.
56// These would need to be implemented or a library would have to be used to simulate them.
57
Not Sure What to Study? Take the 2-min Quiz:

Which technique can we use to find the middle of a linked list?

Time and Space Complexity

For this BoundedBlockingQueue implementation using semaphores, we will analyze the time and space complexities of its operations.

Time Complexity

  • __init__: Initializing the queue involves setting up two semaphores and the underlying deque. This operation is constant time, O(1), as it involves only a fixed number of operations, regardless of the capacity of the queue.

  • enqueue: The enqueue operation involves two semaphore operations (acquire and release) and an append operation on a deque. The semaphore operations are generally O(1) assuming they don't block; if they do block, the time complexity is dependent on external factors such as contention from other threads. Appending to the deque is an O(1) operation. Therefore, the combined time complexity is O(1) per call in the absence of blocking.

  • dequeue: Like enqueue, dequeue also has two semaphore operations and a popleft operation on the deque. Since deque's popleft is designed to be O(1) and semaphore operations are O(1) without blocking, the overall time complexity for dequeue is again O(1) per call in the absence of blocking.

  • size: This simply returns the number of items in the deque, which is maintained internally and is thus an O(1) operation.

Space Complexity

  • The space complexity revolves around the deque that stores elements. Since the capacity of the queue is fixed, the maximum space it will use is O(capacity), corresponding to the maximum number of elements that can be enqueued at any given time.

In summary, except for the potential blocking on semaphores (which can't be quantified in standard complexity analysis), all fundamental operations (__init__, enqueue, dequeue, size) on the BoundedBlockingQueue class have a time complexity of O(1). The space complexity is O(capacity) based on the fixed maximum size of the underlying deque.

Learn more about how to find time and space complexity quickly using problem constraints.

Fast Track Your Learning with Our Quick Skills Quiz:

Which of the following array represent a max heap?


Recommended Readings


Got a question? Ask the Teaching Assistant anything you don't understand.

Still not clear? Ask in the Forum,  Discord or Submit the part you don't understand to our editors.


TA 👨‍🏫