Facebook Pixel

2636. Promise Pool 🔒

Problem Description

This problem asks you to implement a promise pool with a concurrency limit. You're given an array of asynchronous functions and a pool limit n, and you need to create a function promisePool that executes these functions with controlled concurrency.

The key requirements are:

  1. Pool Limit: At most n promises can be executing (pending) at any given time
  2. Sequential Order: Functions must be started in order - functions[0], then functions[1], then functions[2], and so on
  3. Dynamic Execution: When one promise completes, immediately start the next available function (if any remain)
  4. Completion: Return a promise that resolves only after ALL input functions have completed

For example:

  • If n = 1: Only one function executes at a time. When it completes, the next one starts. This is serial execution.
  • If n = 2: Two functions start immediately. When either completes, the third function starts (if available). There are always at most 2 functions running simultaneously.
  • If n = 5 and you have 3 functions: All 3 start immediately since the pool limit exceeds the number of functions.

The solution uses a clever wrapper approach:

  • Each function is wrapped to automatically trigger the next waiting function upon completion
  • The first n functions start immediately as "running" functions
  • The remaining functions wait in a queue
  • When any running function completes, it pulls and executes the next function from the waiting queue
  • Promise.all ensures we wait for all initially running functions (and their chain reactions) to complete
Quick Interview Experience
Help others by sharing your interview experience
Have you seen this problem before?

Intuition

The core challenge is maintaining exactly n concurrent operations while ensuring functions execute in order. Think of it like a conveyor belt with limited processing stations.

The key insight is that we need a self-sustaining chain reaction. When a function completes, it should automatically trigger the next waiting function without external coordination. This eliminates the need for complex state management or recursive scheduling.

Consider what happens when a promise completes in a pool:

  1. A slot becomes available
  2. The next function in line should immediately take that slot
  3. This process continues until no functions remain

The elegant solution is to wrap each function so it carries the responsibility of starting the next one. Each wrapper:

  • Executes its own function
  • Upon completion, checks if there are waiting functions
  • If yes, pulls and executes the next one

This creates a chain reaction where each completing function automatically maintains the pool size. We split the wrapped functions into two groups:

  • Running: The first n functions that start immediately
  • Waiting: The remaining functions queued for execution

By using Promise.all on just the initially running functions, we capture the entire execution chain. Each running function, through its wrapper, will eventually process all waiting functions in its "lane". When all initial runners complete (including their chain reactions), all functions have been executed.

The beauty of this approach is its simplicity - no counters, no complex state machines, just functions that know how to pass the baton to the next one in line. The shift() operation ensures functions are dequeued in order, maintaining the sequential requirement.

Solution Approach

The implementation uses a wrapper pattern with queue management to maintain the concurrency limit:

Step 1: Create Wrapper Functions

const wrappers = functions.map(fn => async () => {
    await fn();
    const nxt = waiting.shift();
    nxt && (await nxt());
});

Each original function is wrapped in a new async function that:

  • First executes the original function with await fn()
  • Then attempts to pull the next function from the waiting queue using shift()
  • If a next function exists, it executes it with await nxt()

This creates a self-perpetuating chain where each function completion triggers the next queued function.

Step 2: Split Functions into Running and Waiting

const running = wrappers.slice(0, n);
const waiting = wrappers.slice(n);
  • The first n wrapped functions are placed in the running array - these will start immediately
  • The remaining functions go into the waiting array - these will be pulled and executed as slots become available
  • Using slice() ensures we handle cases where there are fewer functions than the pool limit

Step 3: Start Execution and Return Promise

return Promise.all(running.map(fn => fn()));
  • Execute all functions in the running array immediately by calling them: fn()
  • Promise.all waits for all these initial functions to complete
  • Due to the wrapper logic, each completing function will pull from waiting and continue the chain
  • The promise resolves only when all chains have completed (all functions have been executed)

Key Data Structures:

  • Array as Queue: The waiting array acts as a FIFO queue with shift() operations
  • Closure: The wrappers maintain access to the waiting array through closure, allowing dynamic queue management

Why This Works:

  • If we have 5 functions and n=2, functions 0 and 1 start immediately
  • When function 0 completes, its wrapper pulls function 2 from waiting and executes it
  • When function 1 completes, its wrapper pulls function 3 from waiting and executes it
  • This continues until waiting is empty
  • Promise.all resolves when both initial chains (started by functions 0 and 1) complete

Ready to land your dream job?

Unlock your dream job with a 5-minute evaluator for a personalized learning plan!

Start Evaluator

Example Walkthrough

Let's walk through a concrete example with 4 functions and a pool limit of 2:

Setup:

  • Functions: [f0, f1, f2, f3] where each takes different times to complete
  • Pool limit: n = 2
  • f0 takes 300ms, f1 takes 100ms, f2 takes 200ms, f3 takes 150ms

Step 1: Create Wrappers Each function gets wrapped:

wrapper0 = async () => {
    await f0();  // Execute f0
    const next = waiting.shift();  // Pull from waiting queue
    if (next) await next();  // Execute next if exists
}
// Similarly for wrapper1, wrapper2, wrapper3

Step 2: Split into Running and Waiting

running = [wrapper0, wrapper1]  // First 2 wrappers
waiting = [wrapper2, wrapper3]  // Remaining wrappers

Step 3: Execution Timeline

Time 0ms:

  • wrapper0() starts → f0 begins (will take 300ms)
  • wrapper1() starts → f1 begins (will take 100ms)
  • Pool: 2/2 slots occupied

Time 100ms:

  • f1 completes
  • wrapper1 pulls wrapper2 from waiting queue
  • wrapper2() starts → f2 begins (will take 200ms)
  • Pool: 2/2 slots occupied
  • Waiting queue: [wrapper3]

Time 300ms:

  • f0 completes
  • wrapper0 pulls wrapper3 from waiting queue
  • wrapper3() starts → f3 begins (will take 150ms)
  • f2 is still running (started at 100ms, needs 200ms total)
  • Pool: 2/2 slots occupied
  • Waiting queue: [] (empty)

Time 300ms (continued):

  • f2 completes (100ms + 200ms = 300ms)
  • wrapper2 tries to pull from waiting, but queue is empty
  • wrapper2 chain completes
  • Pool: 1/2 slots occupied

Time 450ms:

  • f3 completes (300ms + 150ms = 450ms)
  • wrapper3 tries to pull from waiting, but queue is empty
  • wrapper3 chain completes
  • Pool: 0/2 slots occupied

Result:

  • Promise.all([wrapper0(), wrapper1()]) resolves at 450ms
  • All functions executed in order: f0→f1→f2→f3
  • Maximum 2 functions ran concurrently at any time

The key insight is that wrapper1 finishing early (at 100ms) immediately triggered wrapper2, maintaining the pool size at 2. Similarly, when wrapper0 finished at 300ms, it triggered wrapper3. The Promise.all waits for both initial chains to complete - the chain starting with wrapper0 (which executed f0→f3) and the chain starting with wrapper1 (which executed f1→f2).

Solution Implementation

1import asyncio
2from typing import List, Callable, Awaitable, Any
3from collections import deque
4
5async def promisePool(functions: List[Callable[[], Awaitable[Any]]], n: int) -> None:
6    """
7    Executes an array of async functions with a concurrency limit.
8  
9    Args:
10        functions: List of async functions to execute
11        n: Maximum number of functions to run concurrently
12  
13    Returns:
14        None when all functions complete
15    """
16    # Queue to hold functions that are waiting to be executed
17    waiting_queue = deque()
18  
19    # Create wrapped versions of each function
20    wrapped_functions = []
21  
22    for func in functions:
23        async def create_wrapped(original_func=func):
24            """
25            Wrapper that executes the function and then starts the next waiting function.
26            """
27            # Execute the current function
28            await original_func()
29          
30            # After completion, check if there are waiting functions
31            if waiting_queue:
32                # Get and execute the next waiting function
33                next_function = waiting_queue.popleft()
34                await next_function()
35      
36        # Store the wrapped function with proper closure
37        wrapped_functions.append(create_wrapped)
38  
39    # Split wrapped functions into currently running and waiting groups
40    currently_running = wrapped_functions[:n]
41    waiting_queue.extend(wrapped_functions[n:])
42  
43    # Start execution of the initial batch and wait for all to complete
44    await asyncio.gather(*[wrapped_func() for wrapped_func in currently_running])
45
1import java.util.*;
2import java.util.concurrent.*;
3import java.util.function.Supplier;
4
5public class PromisePoolExecutor {
6  
7    /**
8     * Executes an array of async functions with a concurrency limit.
9     * @param functions - List of async functions (Suppliers that return CompletableFuture) to execute
10     * @param n - Maximum number of functions to run concurrently
11     * @return CompletableFuture that completes when all functions finish execution
12     */
13    public static CompletableFuture<Void> promisePool(List<Supplier<CompletableFuture<?>>> functions, int n) {
14        // Queue to hold functions that are waiting to be executed
15        Queue<Supplier<CompletableFuture<Void>>> waitingQueue = new LinkedList<>();
16      
17        // List to store wrapped functions with automatic chaining behavior
18        List<Supplier<CompletableFuture<Void>>> wrappedFunctions = new ArrayList<>();
19      
20        // Wrap each function to automatically start the next waiting function upon completion
21        for (Supplier<CompletableFuture<?>> func : functions) {
22            Supplier<CompletableFuture<Void>> wrappedFunc = () -> {
23                // Execute the current function
24                return func.get().thenCompose(result -> {
25                    // After completion, check if there are waiting functions in the queue
26                    Supplier<CompletableFuture<Void>> nextFunction = waitingQueue.poll();
27                    if (nextFunction != null) {
28                        // Execute the next waiting function recursively
29                        return nextFunction.get();
30                    }
31                    // No more waiting functions, return completed future
32                    return CompletableFuture.completedFuture(null);
33                });
34            };
35            wrappedFunctions.add(wrappedFunc);
36        }
37      
38        // Split wrapped functions into currently running and waiting groups
39        // Take first n functions to run immediately
40        List<Supplier<CompletableFuture<Void>>> currentlyRunning = wrappedFunctions.subList(0, Math.min(n, wrappedFunctions.size()));
41      
42        // Add remaining functions to the waiting queue
43        if (wrappedFunctions.size() > n) {
44            waitingQueue.addAll(wrappedFunctions.subList(n, wrappedFunctions.size()));
45        }
46      
47        // Start execution of the initial batch of functions
48        CompletableFuture<?>[] runningFutures = currentlyRunning.stream()
49            .map(Supplier::get)
50            .toArray(CompletableFuture[]::new);
51      
52        // Return a CompletableFuture that completes when all functions finish
53        return CompletableFuture.allOf(runningFutures);
54    }
55}
56
1#include <vector>
2#include <queue>
3#include <future>
4#include <functional>
5#include <memory>
6
7/**
8 * Executes an array of async functions with a concurrency limit.
9 * @param functions - Vector of async functions to execute
10 * @param n - Maximum number of functions to run concurrently
11 * @return Future that resolves when all functions complete
12 */
13std::future<void> promisePool(
14    const std::vector<std::function<std::future<void>()>>& functions, 
15    int n) {
16  
17    // Shared queue to hold indices of functions waiting to be executed
18    auto waiting_queue = std::make_shared<std::queue<size_t>>();
19  
20    // Shared vector to hold all async tasks
21    auto tasks = std::make_shared<std::vector<std::future<void>>>();
22  
23    // Shared mutex for thread-safe queue operations
24    auto queue_mutex = std::make_shared<std::mutex>();
25  
26    // Lambda to process a function and then process next waiting function
27    auto process_function = [functions, waiting_queue, queue_mutex](size_t index) {
28        // Execute the current function
29        auto current_future = functions[index]();
30        current_future.wait();
31      
32        // After completion, check if there are waiting functions
33        std::lock_guard<std::mutex> lock(*queue_mutex);
34        if (!waiting_queue->empty()) {
35            size_t next_index = waiting_queue->front();
36            waiting_queue->pop();
37          
38            // Recursively process the next waiting function
39            auto next_future = std::async(std::launch::async, 
40                [functions, waiting_queue, queue_mutex, next_index]() {
41                    // Execute the next function
42                    auto future = functions[next_index]();
43                    future.wait();
44                  
45                    // Check for more waiting functions
46                    std::lock_guard<std::mutex> lock(*queue_mutex);
47                    if (!waiting_queue->empty()) {
48                        size_t next = waiting_queue->front();
49                        waiting_queue->pop();
50                        // Continue processing chain
51                        auto next_future = functions[next]();
52                        next_future.wait();
53                    }
54                });
55        }
56    };
57  
58    // Add indices of functions that will wait to the queue
59    for (size_t i = n; i < functions.size(); ++i) {
60        waiting_queue->push(i);
61    }
62  
63    // Start execution of the initial batch (first n functions)
64    for (int i = 0; i < std::min(n, static_cast<int>(functions.size())); ++i) {
65        tasks->push_back(
66            std::async(std::launch::async, process_function, i)
67        );
68    }
69  
70    // Return a future that completes when all tasks are done
71    return std::async(std::launch::async, [tasks]() {
72        for (auto& task : *tasks) {
73            task.wait();
74        }
75    });
76}
77
1/**
2 * Type definition for an async function that returns a Promise
3 */
4type F = () => Promise<any>;
5
6/**
7 * Executes an array of async functions with a concurrency limit.
8 * @param functions - Array of async functions to execute
9 * @param n - Maximum number of functions to run concurrently
10 * @returns Promise that resolves when all functions complete
11 */
12function promisePool(functions: F[], n: number): Promise<any> {
13    // Array to hold functions that are waiting to be executed
14    const waitingQueue: (() => Promise<void>)[] = [];
15  
16    // Wrap each function to automatically start the next waiting function upon completion
17    const wrappedFunctions = functions.map((func: F) => {
18        return async (): Promise<void> => {
19            // Execute the current function
20            await func();
21          
22            // After completion, check if there are waiting functions
23            const nextFunction = waitingQueue.shift();
24            if (nextFunction) {
25                // Execute the next waiting function
26                await nextFunction();
27            }
28        };
29    });
30  
31    // Split wrapped functions into currently running and waiting groups
32    const currentlyRunning = wrappedFunctions.slice(0, n);
33    waitingQueue.push(...wrappedFunctions.slice(n));
34  
35    // Start execution of the initial batch and return the promise
36    return Promise.all(currentlyRunning.map((wrappedFunc) => wrappedFunc()));
37}
38

Time and Space Complexity

Time Complexity: O(m) where m is the total number of functions in the array.

The algorithm processes each function exactly once. Although functions are executed with concurrency limit n, each function must eventually be executed. The wrapper functions ensure that as one promise completes, the next waiting function (if any) is triggered. The time complexity is determined by the need to process all m functions, regardless of the pooling mechanism.

Space Complexity: O(m) where m is the total number of functions in the array.

The space complexity breaks down as follows:

  • wrappers array: O(m) - stores wrapped versions of all input functions
  • running array: O(min(n, m)) - stores up to n wrapper references
  • waiting array: O(m - n) when m > n, otherwise O(0)
  • Promise.all creates internal tracking for O(n) concurrent promises

Since the wrappers array contains all m wrapped functions and dominates the space usage, the overall space complexity is O(m). The running and waiting arrays contain references to the same wrapper functions rather than copies, so they don't add to the asymptotic complexity.

Common Pitfalls

1. Closure Variable Binding Issue

The most critical pitfall in the Python implementation is the incorrect closure binding in the loop. When creating wrapped functions inside a loop, Python's late binding behavior causes all wrapped functions to reference the same func variable, which will hold the value of the last function in the array.

Problem Code:

for func in functions:
    async def create_wrapped(original_func=func):  # This doesn't work as expected
        await original_func()
        # ...
    wrapped_functions.append(create_wrapped)

What happens: All wrapped functions will execute the last function in the array multiple times instead of their respective functions.

Solution: Use a default parameter or create a closure properly:

# Option 1: Function factory
def create_wrapper(func, waiting_queue):
    async def wrapped():
        await func()
        if waiting_queue:
            next_function = waiting_queue.popleft()
            await next_function()
    return wrapped

for func in functions:
    wrapped_functions.append(create_wrapper(func, waiting_queue))

# Option 2: Use functools.partial or lambda with immediate binding
for func in functions:
    wrapped = lambda f=func: execute_and_continue(f, waiting_queue)
    wrapped_functions.append(wrapped)

2. Shared Mutable State Without Synchronization

Using a shared deque without proper synchronization can lead to race conditions when multiple coroutines try to access it simultaneously.

Problem: Multiple wrapped functions might complete at nearly the same time and attempt to popleft() from the queue concurrently, potentially causing issues.

Solution: Use thread-safe structures or asyncio synchronization primitives:

import asyncio

async def promisePool(functions, n):
    waiting_queue = deque()
    queue_lock = asyncio.Lock()  # Add synchronization
  
    async def create_wrapped(original_func):
        await original_func()
      
        async with queue_lock:  # Synchronized access
            if waiting_queue:
                next_function = waiting_queue.popleft()
                await next_function()  # Execute outside lock to avoid deadlock

3. Empty Functions Array or n <= 0

The code doesn't handle edge cases where the functions array is empty or the pool limit is zero or negative.

Solution: Add validation:

async def promisePool(functions, n):
    if not functions or n <= 0:
        return
  
    n = min(n, len(functions))  # Ensure n doesn't exceed array length
    # ... rest of implementation

4. Exception Handling

If any function throws an exception, the current implementation will fail the entire pool without executing remaining functions.

Solution: Add try-catch to ensure pool continues:

async def create_wrapped(original_func):
    try:
        await original_func()
    except Exception as e:
        # Log or handle error as needed
        pass  # Continue to next function even if current fails
  
    if waiting_queue:
        next_function = waiting_queue.popleft()
        await next_function()

5. Memory Leak with Large Function Arrays

Creating all wrapped functions upfront can consume significant memory for large arrays.

Solution: Create wrappers on-demand:

async def promisePool(functions, n):
    index = 0
    running_count = 0
  
    async def execute_next():
        nonlocal index, running_count
        if index < len(functions):
            current_index = index
            index += 1
            running_count += 1
          
            await functions[current_index]()
            running_count -= 1
          
            await execute_next()  # Start next if available
  
    # Start initial batch
    tasks = [execute_next() for _ in range(min(n, len(functions)))]
    await asyncio.gather(*tasks)
Discover Your Strengths and Weaknesses: Take Our 5-Minute Quiz to Tailor Your Study Plan:

Which of the following problems can be solved with backtracking (select multiple)


Recommended Readings

Want a Structured Path to Master System Design Too? Don’t Miss This!

Load More