2636. Promise Pool 🔒
Problem Description
This problem asks you to implement a promise pool with a concurrency limit. You're given an array of asynchronous functions and a pool limit n
, and you need to create a function promisePool
that executes these functions with controlled concurrency.
The key requirements are:
- Pool Limit: At most
n
promises can be executing (pending) at any given time - Sequential Order: Functions must be started in order -
functions[0]
, thenfunctions[1]
, thenfunctions[2]
, and so on - Dynamic Execution: When one promise completes, immediately start the next available function (if any remain)
- Completion: Return a promise that resolves only after ALL input functions have completed
For example:
- If
n = 1
: Only one function executes at a time. When it completes, the next one starts. This is serial execution. - If
n = 2
: Two functions start immediately. When either completes, the third function starts (if available). There are always at most 2 functions running simultaneously. - If
n = 5
and you have 3 functions: All 3 start immediately since the pool limit exceeds the number of functions.
The solution uses a clever wrapper approach:
- Each function is wrapped to automatically trigger the next waiting function upon completion
- The first
n
functions start immediately as "running" functions - The remaining functions wait in a queue
- When any running function completes, it pulls and executes the next function from the waiting queue
Promise.all
ensures we wait for all initially running functions (and their chain reactions) to complete
Intuition
The core challenge is maintaining exactly n
concurrent operations while ensuring functions execute in order. Think of it like a conveyor belt with limited processing stations.
The key insight is that we need a self-sustaining chain reaction. When a function completes, it should automatically trigger the next waiting function without external coordination. This eliminates the need for complex state management or recursive scheduling.
Consider what happens when a promise completes in a pool:
- A slot becomes available
- The next function in line should immediately take that slot
- This process continues until no functions remain
The elegant solution is to wrap each function so it carries the responsibility of starting the next one. Each wrapper:
- Executes its own function
- Upon completion, checks if there are waiting functions
- If yes, pulls and executes the next one
This creates a chain reaction where each completing function automatically maintains the pool size. We split the wrapped functions into two groups:
- Running: The first
n
functions that start immediately - Waiting: The remaining functions queued for execution
By using Promise.all
on just the initially running functions, we capture the entire execution chain. Each running function, through its wrapper, will eventually process all waiting functions in its "lane". When all initial runners complete (including their chain reactions), all functions have been executed.
The beauty of this approach is its simplicity - no counters, no complex state machines, just functions that know how to pass the baton to the next one in line. The shift()
operation ensures functions are dequeued in order, maintaining the sequential requirement.
Solution Approach
The implementation uses a wrapper pattern with queue management to maintain the concurrency limit:
Step 1: Create Wrapper Functions
const wrappers = functions.map(fn => async () => {
await fn();
const nxt = waiting.shift();
nxt && (await nxt());
});
Each original function is wrapped in a new async function that:
- First executes the original function with
await fn()
- Then attempts to pull the next function from the waiting queue using
shift()
- If a next function exists, it executes it with
await nxt()
This creates a self-perpetuating chain where each function completion triggers the next queued function.
Step 2: Split Functions into Running and Waiting
const running = wrappers.slice(0, n); const waiting = wrappers.slice(n);
- The first
n
wrapped functions are placed in therunning
array - these will start immediately - The remaining functions go into the
waiting
array - these will be pulled and executed as slots become available - Using
slice()
ensures we handle cases where there are fewer functions than the pool limit
Step 3: Start Execution and Return Promise
return Promise.all(running.map(fn => fn()));
- Execute all functions in the
running
array immediately by calling them:fn()
Promise.all
waits for all these initial functions to complete- Due to the wrapper logic, each completing function will pull from
waiting
and continue the chain - The promise resolves only when all chains have completed (all functions have been executed)
Key Data Structures:
- Array as Queue: The
waiting
array acts as a FIFO queue withshift()
operations - Closure: The wrappers maintain access to the
waiting
array through closure, allowing dynamic queue management
Why This Works:
- If we have 5 functions and
n=2
, functions 0 and 1 start immediately - When function 0 completes, its wrapper pulls function 2 from
waiting
and executes it - When function 1 completes, its wrapper pulls function 3 from
waiting
and executes it - This continues until
waiting
is empty Promise.all
resolves when both initial chains (started by functions 0 and 1) complete
Ready to land your dream job?
Unlock your dream job with a 5-minute evaluator for a personalized learning plan!
Start EvaluatorExample Walkthrough
Let's walk through a concrete example with 4 functions and a pool limit of 2:
Setup:
- Functions:
[f0, f1, f2, f3]
where each takes different times to complete - Pool limit:
n = 2
f0
takes 300ms,f1
takes 100ms,f2
takes 200ms,f3
takes 150ms
Step 1: Create Wrappers Each function gets wrapped:
wrapper0 = async () => { await f0(); // Execute f0 const next = waiting.shift(); // Pull from waiting queue if (next) await next(); // Execute next if exists } // Similarly for wrapper1, wrapper2, wrapper3
Step 2: Split into Running and Waiting
running = [wrapper0, wrapper1] // First 2 wrappers waiting = [wrapper2, wrapper3] // Remaining wrappers
Step 3: Execution Timeline
Time 0ms:
wrapper0()
starts →f0
begins (will take 300ms)wrapper1()
starts →f1
begins (will take 100ms)- Pool: 2/2 slots occupied
Time 100ms:
f1
completeswrapper1
pullswrapper2
from waiting queuewrapper2()
starts →f2
begins (will take 200ms)- Pool: 2/2 slots occupied
- Waiting queue:
[wrapper3]
Time 300ms:
f0
completeswrapper0
pullswrapper3
from waiting queuewrapper3()
starts →f3
begins (will take 150ms)f2
is still running (started at 100ms, needs 200ms total)- Pool: 2/2 slots occupied
- Waiting queue:
[]
(empty)
Time 300ms (continued):
f2
completes (100ms + 200ms = 300ms)wrapper2
tries to pull from waiting, but queue is emptywrapper2
chain completes- Pool: 1/2 slots occupied
Time 450ms:
f3
completes (300ms + 150ms = 450ms)wrapper3
tries to pull from waiting, but queue is emptywrapper3
chain completes- Pool: 0/2 slots occupied
Result:
Promise.all([wrapper0(), wrapper1()])
resolves at 450ms- All functions executed in order: f0→f1→f2→f3
- Maximum 2 functions ran concurrently at any time
The key insight is that wrapper1
finishing early (at 100ms) immediately triggered wrapper2
, maintaining the pool size at 2. Similarly, when wrapper0
finished at 300ms, it triggered wrapper3
. The Promise.all
waits for both initial chains to complete - the chain starting with wrapper0
(which executed f0→f3) and the chain starting with wrapper1
(which executed f1→f2).
Solution Implementation
1import asyncio
2from typing import List, Callable, Awaitable, Any
3from collections import deque
4
5async def promisePool(functions: List[Callable[[], Awaitable[Any]]], n: int) -> None:
6 """
7 Executes an array of async functions with a concurrency limit.
8
9 Args:
10 functions: List of async functions to execute
11 n: Maximum number of functions to run concurrently
12
13 Returns:
14 None when all functions complete
15 """
16 # Queue to hold functions that are waiting to be executed
17 waiting_queue = deque()
18
19 # Create wrapped versions of each function
20 wrapped_functions = []
21
22 for func in functions:
23 async def create_wrapped(original_func=func):
24 """
25 Wrapper that executes the function and then starts the next waiting function.
26 """
27 # Execute the current function
28 await original_func()
29
30 # After completion, check if there are waiting functions
31 if waiting_queue:
32 # Get and execute the next waiting function
33 next_function = waiting_queue.popleft()
34 await next_function()
35
36 # Store the wrapped function with proper closure
37 wrapped_functions.append(create_wrapped)
38
39 # Split wrapped functions into currently running and waiting groups
40 currently_running = wrapped_functions[:n]
41 waiting_queue.extend(wrapped_functions[n:])
42
43 # Start execution of the initial batch and wait for all to complete
44 await asyncio.gather(*[wrapped_func() for wrapped_func in currently_running])
45
1import java.util.*;
2import java.util.concurrent.*;
3import java.util.function.Supplier;
4
5public class PromisePoolExecutor {
6
7 /**
8 * Executes an array of async functions with a concurrency limit.
9 * @param functions - List of async functions (Suppliers that return CompletableFuture) to execute
10 * @param n - Maximum number of functions to run concurrently
11 * @return CompletableFuture that completes when all functions finish execution
12 */
13 public static CompletableFuture<Void> promisePool(List<Supplier<CompletableFuture<?>>> functions, int n) {
14 // Queue to hold functions that are waiting to be executed
15 Queue<Supplier<CompletableFuture<Void>>> waitingQueue = new LinkedList<>();
16
17 // List to store wrapped functions with automatic chaining behavior
18 List<Supplier<CompletableFuture<Void>>> wrappedFunctions = new ArrayList<>();
19
20 // Wrap each function to automatically start the next waiting function upon completion
21 for (Supplier<CompletableFuture<?>> func : functions) {
22 Supplier<CompletableFuture<Void>> wrappedFunc = () -> {
23 // Execute the current function
24 return func.get().thenCompose(result -> {
25 // After completion, check if there are waiting functions in the queue
26 Supplier<CompletableFuture<Void>> nextFunction = waitingQueue.poll();
27 if (nextFunction != null) {
28 // Execute the next waiting function recursively
29 return nextFunction.get();
30 }
31 // No more waiting functions, return completed future
32 return CompletableFuture.completedFuture(null);
33 });
34 };
35 wrappedFunctions.add(wrappedFunc);
36 }
37
38 // Split wrapped functions into currently running and waiting groups
39 // Take first n functions to run immediately
40 List<Supplier<CompletableFuture<Void>>> currentlyRunning = wrappedFunctions.subList(0, Math.min(n, wrappedFunctions.size()));
41
42 // Add remaining functions to the waiting queue
43 if (wrappedFunctions.size() > n) {
44 waitingQueue.addAll(wrappedFunctions.subList(n, wrappedFunctions.size()));
45 }
46
47 // Start execution of the initial batch of functions
48 CompletableFuture<?>[] runningFutures = currentlyRunning.stream()
49 .map(Supplier::get)
50 .toArray(CompletableFuture[]::new);
51
52 // Return a CompletableFuture that completes when all functions finish
53 return CompletableFuture.allOf(runningFutures);
54 }
55}
56
1#include <vector>
2#include <queue>
3#include <future>
4#include <functional>
5#include <memory>
6
7/**
8 * Executes an array of async functions with a concurrency limit.
9 * @param functions - Vector of async functions to execute
10 * @param n - Maximum number of functions to run concurrently
11 * @return Future that resolves when all functions complete
12 */
13std::future<void> promisePool(
14 const std::vector<std::function<std::future<void>()>>& functions,
15 int n) {
16
17 // Shared queue to hold indices of functions waiting to be executed
18 auto waiting_queue = std::make_shared<std::queue<size_t>>();
19
20 // Shared vector to hold all async tasks
21 auto tasks = std::make_shared<std::vector<std::future<void>>>();
22
23 // Shared mutex for thread-safe queue operations
24 auto queue_mutex = std::make_shared<std::mutex>();
25
26 // Lambda to process a function and then process next waiting function
27 auto process_function = [functions, waiting_queue, queue_mutex](size_t index) {
28 // Execute the current function
29 auto current_future = functions[index]();
30 current_future.wait();
31
32 // After completion, check if there are waiting functions
33 std::lock_guard<std::mutex> lock(*queue_mutex);
34 if (!waiting_queue->empty()) {
35 size_t next_index = waiting_queue->front();
36 waiting_queue->pop();
37
38 // Recursively process the next waiting function
39 auto next_future = std::async(std::launch::async,
40 [functions, waiting_queue, queue_mutex, next_index]() {
41 // Execute the next function
42 auto future = functions[next_index]();
43 future.wait();
44
45 // Check for more waiting functions
46 std::lock_guard<std::mutex> lock(*queue_mutex);
47 if (!waiting_queue->empty()) {
48 size_t next = waiting_queue->front();
49 waiting_queue->pop();
50 // Continue processing chain
51 auto next_future = functions[next]();
52 next_future.wait();
53 }
54 });
55 }
56 };
57
58 // Add indices of functions that will wait to the queue
59 for (size_t i = n; i < functions.size(); ++i) {
60 waiting_queue->push(i);
61 }
62
63 // Start execution of the initial batch (first n functions)
64 for (int i = 0; i < std::min(n, static_cast<int>(functions.size())); ++i) {
65 tasks->push_back(
66 std::async(std::launch::async, process_function, i)
67 );
68 }
69
70 // Return a future that completes when all tasks are done
71 return std::async(std::launch::async, [tasks]() {
72 for (auto& task : *tasks) {
73 task.wait();
74 }
75 });
76}
77
1/**
2 * Type definition for an async function that returns a Promise
3 */
4type F = () => Promise<any>;
5
6/**
7 * Executes an array of async functions with a concurrency limit.
8 * @param functions - Array of async functions to execute
9 * @param n - Maximum number of functions to run concurrently
10 * @returns Promise that resolves when all functions complete
11 */
12function promisePool(functions: F[], n: number): Promise<any> {
13 // Array to hold functions that are waiting to be executed
14 const waitingQueue: (() => Promise<void>)[] = [];
15
16 // Wrap each function to automatically start the next waiting function upon completion
17 const wrappedFunctions = functions.map((func: F) => {
18 return async (): Promise<void> => {
19 // Execute the current function
20 await func();
21
22 // After completion, check if there are waiting functions
23 const nextFunction = waitingQueue.shift();
24 if (nextFunction) {
25 // Execute the next waiting function
26 await nextFunction();
27 }
28 };
29 });
30
31 // Split wrapped functions into currently running and waiting groups
32 const currentlyRunning = wrappedFunctions.slice(0, n);
33 waitingQueue.push(...wrappedFunctions.slice(n));
34
35 // Start execution of the initial batch and return the promise
36 return Promise.all(currentlyRunning.map((wrappedFunc) => wrappedFunc()));
37}
38
Time and Space Complexity
Time Complexity: O(m)
where m
is the total number of functions in the array.
The algorithm processes each function exactly once. Although functions are executed with concurrency limit n
, each function must eventually be executed. The wrapper functions ensure that as one promise completes, the next waiting function (if any) is triggered. The time complexity is determined by the need to process all m
functions, regardless of the pooling mechanism.
Space Complexity: O(m)
where m
is the total number of functions in the array.
The space complexity breaks down as follows:
wrappers
array:O(m)
- stores wrapped versions of all input functionsrunning
array:O(min(n, m))
- stores up ton
wrapper referenceswaiting
array:O(m - n)
whenm > n
, otherwiseO(0)
- Promise.all creates internal tracking for
O(n)
concurrent promises
Since the wrappers
array contains all m
wrapped functions and dominates the space usage, the overall space complexity is O(m)
. The running
and waiting
arrays contain references to the same wrapper functions rather than copies, so they don't add to the asymptotic complexity.
Common Pitfalls
1. Closure Variable Binding Issue
The most critical pitfall in the Python implementation is the incorrect closure binding in the loop. When creating wrapped functions inside a loop, Python's late binding behavior causes all wrapped functions to reference the same func
variable, which will hold the value of the last function in the array.
Problem Code:
for func in functions:
async def create_wrapped(original_func=func): # This doesn't work as expected
await original_func()
# ...
wrapped_functions.append(create_wrapped)
What happens: All wrapped functions will execute the last function in the array multiple times instead of their respective functions.
Solution: Use a default parameter or create a closure properly:
# Option 1: Function factory
def create_wrapper(func, waiting_queue):
async def wrapped():
await func()
if waiting_queue:
next_function = waiting_queue.popleft()
await next_function()
return wrapped
for func in functions:
wrapped_functions.append(create_wrapper(func, waiting_queue))
# Option 2: Use functools.partial or lambda with immediate binding
for func in functions:
wrapped = lambda f=func: execute_and_continue(f, waiting_queue)
wrapped_functions.append(wrapped)
2. Shared Mutable State Without Synchronization
Using a shared deque
without proper synchronization can lead to race conditions when multiple coroutines try to access it simultaneously.
Problem: Multiple wrapped functions might complete at nearly the same time and attempt to popleft()
from the queue concurrently, potentially causing issues.
Solution: Use thread-safe structures or asyncio synchronization primitives:
import asyncio
async def promisePool(functions, n):
waiting_queue = deque()
queue_lock = asyncio.Lock() # Add synchronization
async def create_wrapped(original_func):
await original_func()
async with queue_lock: # Synchronized access
if waiting_queue:
next_function = waiting_queue.popleft()
await next_function() # Execute outside lock to avoid deadlock
3. Empty Functions Array or n <= 0
The code doesn't handle edge cases where the functions array is empty or the pool limit is zero or negative.
Solution: Add validation:
async def promisePool(functions, n):
if not functions or n <= 0:
return
n = min(n, len(functions)) # Ensure n doesn't exceed array length
# ... rest of implementation
4. Exception Handling
If any function throws an exception, the current implementation will fail the entire pool without executing remaining functions.
Solution: Add try-catch to ensure pool continues:
async def create_wrapped(original_func):
try:
await original_func()
except Exception as e:
# Log or handle error as needed
pass # Continue to next function even if current fails
if waiting_queue:
next_function = waiting_queue.popleft()
await next_function()
5. Memory Leak with Large Function Arrays
Creating all wrapped functions upfront can consume significant memory for large arrays.
Solution: Create wrappers on-demand:
async def promisePool(functions, n):
index = 0
running_count = 0
async def execute_next():
nonlocal index, running_count
if index < len(functions):
current_index = index
index += 1
running_count += 1
await functions[current_index]()
running_count -= 1
await execute_next() # Start next if available
# Start initial batch
tasks = [execute_next() for _ in range(min(n, len(functions)))]
await asyncio.gather(*tasks)
Which of the following problems can be solved with backtracking (select multiple)
Recommended Readings
Coding Interview Patterns Your Personal Dijkstra's Algorithm to Landing Your Dream Job The goal of AlgoMonster is to help you get a job in the shortest amount of time possible in a data driven way We compiled datasets of tech interview problems and broke them down by patterns This way
Recursion Recursion is one of the most important concepts in computer science Simply speaking recursion is the process of a function calling itself Using a real life analogy imagine a scenario where you invite your friends to lunch https assets algo monster recursion jpg You first call Ben and ask
Runtime Overview When learning about algorithms and data structures you'll frequently encounter the term time complexity This concept is fundamental in computer science and offers insights into how long an algorithm takes to complete given a certain input size What is Time Complexity Time complexity represents the amount of time
Want a Structured Path to Master System Design Too? Don’t Miss This!