2636. Promise Pool
Problem Description
The problem entails creating a promisePool
function that manages the execution of an array of asynchronous functions within a given limit of how many can run concurrently. The parameter functions
is an array of these asynchronous functions, and n
is the pool limit or the maximum number of promises that can be pending at once.
The goal of promisePool
is to ensure that at any given moment, no more than n
functions from the functions
array are in the process of execution. If n
is set to 1, then the functions are executed one after the other in series. For higher values of n
, up to n
functions are started simultaneously.
Whenever one promise resolves (a function completes its execution), if there are still functions left to execute, another one is started so that there are always up to n
functions running concurrently, until there are no more functions left to start.
The function should return a promise that ultimately resolves when all of the asynchronous functions have resolved. One key aspect to note is that all the functions are guaranteed never to reject, which simplifies the error handling aspect of the implementation.
Intuition
The solution provided outlines a strategy to manage the execution of asynchronous functions with a promise pool. The core idea is to organize the functions into two groups: running
which contains up to n
functions that are currently being executed, and waiting
which contains the rest of the functions that are queued to be executed as soon as there's a free slot in the running
pool.
To achieve this, the solution maps the original functions
array to a new array called wrappers
. Each element in wrappers
is an async function that, when called, will execute its corresponding function from the functions
array.
As each wrapper async function resolves, it checks the waiting
array for any functions that are waiting to be executed. If there's a function waiting (waiting.shift()
), the wrapper function will continue execution by awaiting the next waiting function. This chaining ensures that as soon as a function completes, a new one begins execution if available, maintaining the pool limit.
The initial running
functions are started by slicing the wrappers
list from the beginning up to the pool limit n
. This starts execution for the first n
functions. The remaining functions are placed in the waiting
list and are executed in order as the running functions complete.
The Promise.all()
function is then used on the initial running
functions. This call will resolve once all the initially started functions (and as a result of the chaining, all the functions in waiting
) have resolved. This way, when Promise.all()
resolves, we know that all functions have been executed, respecting the pool limit at all times.
The resulting Promise.all()
call is returned by the promisePool
function, which resolves when the last promise in the pool completes, signaling that all asynchronous functions have finished executing.
Solution Approach
The implementation of the solution for the promisePool
can be divided into several steps, which involves the use of JavaScript closures, asynchronous functions, and the Promise.all()
method to manage the concurrency. Here's a step-by-step walk-through:
-
Map Functions to Async Wrappers: The original array of async functions (
functions
) is mapped to a new array calledwrappers
. Each element ofwrappers
is an async function that:- Awaits the completion of its corresponding function from the original array.
- Looks at a shared
waiting
queue to see if there are other functions waiting to execute. If a function is waiting, it dequeues (waiting.shift()
) and awaits the completion of that function. This is essentially a recursive step -- the completion of one function potentially triggers the start of another.
-
Initialize Running and Waiting Pools: Divide the
wrappers
into two groups: therunning
group representing the concurrentn
functions that can run simultaneously and thewaiting
queue for the remaining functions that will be executed once one of therunning
functions completes. -
Start Initial Execution: Begin executing functions from the
running
group usingPromise.all()
. ThePromise.all()
function takes an iterable of promises and returns a singlePromise
that resolves to an array of the results of the input promises when all of them resolve or when one is rejected (the latter is not a concern here as functions are guaranteed never to reject). -
Chain Execution of Waiting Functions: As each wrapper async function resolves, it triggers the next function in the
waiting
queue. This is done by callingwaiting.shift()
inside the async wrapper function after the awaited function execution. This chaining maintains the invariant that up ton
functions are running concurrently until there are no more functions left to execute. -
Complete All Executions: The promise returned by
Promise.all()
will only resolve once all of the promises in therunning
array have resolved. However, due to the chaining implemented in the wrapper functions, this also implies that all functions in thewaiting
queue have finished running. Hence, when thePromise.all()
promise resolves, we know that every function in thepromisePool
has been executed respecting the pool limit. -
Return Final Promise: Finally, the promise resolved by
Promise.all()
is returned from thepromisePool
function. This promise therefore represents the completion of all async function executions while respecting the concurrency limit.
This solution fundamentally employs the concept of concurrency control with a finite number of workers (running
functions) and a queue (waiting
functions). It ensures that no more than n
async operations are being processed at the same time, starting new operations as soon as older ones finish.
Ready to land your dream job?
Unlock your dream job with a 2-minute evaluator for a personalized learning plan!
Start EvaluatorExample Walkthrough
Let's illustrate the solution approach with a smaller example. Suppose we have an array of asynchronous functions that simply resolve after a certain delay and log their index. Let's say we have 5 such functions and our pool limit n
is 2.
Our asynchronous functions might look like this:
1const function1 = () => new Promise(resolve => setTimeout(() => { console.log("Function 1 done"); resolve(); }, 2000));
2const function2 = () => new Promise(resolve => setTimeout(() => { console.log("Function 2 done"); resolve(); }, 1000));
3const function3 = () => new Promise(resolve => setTimeout(() => { console.log("Function 3 done"); resolve(); }, 1500));
4const function4 = () => new Promise(resolve => setTimeout(() => { console.log("Function 4 done"); resolve(); }, 500));
5const function5 = () => new Promise(resolve => setTimeout(() => { console.log("Function 5 done"); resolve(); }, 1000));
6
7const functions = [function1, function2, function3, function4, function5];
Implementing the promisePool
function with a pool limit of n = 2
, we would expect the following behavior:
-
Start with function1 and function2: Since our pool limit is 2, function1 and function2 will start executing simultaneously.
-
function2 finishes first: Despite function1 starting first, function2 will finish before function1 due to its shorter timeout. Upon function2's completion, function3 will start because it's next in the queue.
-
function3 and function1 are running: Now, function3 is running alongside function1, which has not yet completed.
-
function1 finishes; function4 starts: Once function1 completes, function4 is dequeued and starts executing.
-
function4 finishes quickly; function5 starts: Since function4 has the shortest timeout, it will finish quickly, after which function5 will begin.
-
function3 finishes; all functions started: By this time, function3 would likely have finished, and since function5 was the last one in the queue, no new functions are started.
-
function5 finishes; end: Lastly, function5 finishes, and since it was the last function to be executed, the
promisePool
should resolve now.
As we can see, the pool starts with two functions and whenever a function finishes, it triggers the next function in line if there is any. At no point do more than two functions run concurrently. The promisePool
function orchestrates the asynchronous functions' execution respecting the concurrency limit and eventually resolves when all functions have completed their execution.
Solution Implementation
1from asyncio import Semaphore, create_task, gather
2from typing import Callable, List
3
4# Defines a type for an asynchronous function that returns any value.
5AsyncFunction = Callable[[], "Coroutine[Any, Any, Any]"]
6
7async def promise_pool(async_functions: List[AsyncFunction], limit: int) -> List[Any]:
8 """
9 Executes a pool of asynchronous functions concurrently, but with a limited number of async functions
10 running at the same time.
11 :param async_functions: A list of functions that are asynchronous and return promises (in Python, 'awaitables').
12 :param limit: The maximum number of async functions that can be running simultaneously.
13 :return: A list of results from the async function executions, when all functions have completed.
14 """
15
16 # A semaphore to control the number of async functions that can run at the same time.
17 semaphore = Semaphore(limit)
18
19 async def run_with_semaphore(fn: AsyncFunction):
20 """
21 Wraps an async function to acquire a semaphore before execution and release it afterward,
22 ensuring that the number of concurrently running functions does not exceed the limit.
23 """
24 async with semaphore:
25 return await fn()
26
27 # Wrap each async function with the semaphore logic and schedule it as a task.
28 tasks = [create_task(run_with_semaphore(fn)) for fn in async_functions]
29
30 # Use asyncio.gather to run tasks concurrently and wait until all are finished.
31 return await gather(*tasks)
32
33# Example usage of the function (assuming example async functions async_foo and async_bar):
34# results = asyncio.run(promise_pool([async_foo, async_bar], limit=2))
35
1import java.util.ArrayList;
2import java.util.LinkedList;
3import java.util.List;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.Executor;
6import java.util.concurrent.Executors;
7import java.util.function.Supplier;
8import java.util.stream.Collectors;
9
10/**
11 * Executes a pool of asynchronous operations concurrently, but limits the number of operations
12 * running at the same time.
13 *
14 * @param asyncFunctions A list of suppliers that provide CompletableFutures.
15 * @param limit The maximum number of CompletableFutures that can be running concurrently.
16 * @return A CompletableFuture that resolves when all the given suppliers have completed.
17 */
18public CompletableFuture<List<Object>> promisePool(
19 List<Supplier<CompletableFuture<Object>>> asyncFunctions, int limit) {
20
21 // Create an executor with a fixed thread pool to control the number of concurrent threads.
22 Executor executor = Executors.newFixedThreadPool(limit);
23
24 // A list to hold the CompletableFutures created by running the async functions.
25 List<CompletableFuture<Object>> futures = new LinkedList<>();
26
27 // Add each async function as a CompletableFuture in the list to be executed.
28 for (Supplier<CompletableFuture<Object>> function : asyncFunctions) {
29 CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> function.get().join(), executor);
30 futures.add(future);
31 }
32
33 // Combine all the CompletableFutures into a single CompletableFuture that contains a list of results.
34 // This CompletableFuture will complete when all the individual CompletableFutures are complete.
35 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
36 .thenApply(v -> futures.stream()
37 .map(CompletableFuture::join)
38 .collect(Collectors.toList())
39 );
40}
41
42// Example usage of the promisePool method:
43public static void main(String[] args) {
44
45 // Create a list of asynchronous operations using CompletableFuture.supplyAsync
46 List<Supplier<CompletableFuture<Object>>> asyncFunctions = new ArrayList<>();
47 asyncFunctions.add(() -> CompletableFuture.supplyAsync(() -> {
48 // Perform some asynchronous operation here
49 return "Result of async task 1";
50 }));
51 asyncFunctions.add(() -> CompletableFuture.supplyAsync(() -> {
52 // Perform some asynchronous operation here
53 return "Result of async task 2";
54 }));
55 // Add more async tasks as needed
56
57 // Execute the promisePool method
58 CompletableFuture<List<Object>> result = promisePool(asyncFunctions, 2);
59
60 // Process the result when all async operations are complete
61 result.thenAccept(results -> results.forEach(System.out::println));
62}
63
1#include <vector> // Required for std::vector
2#include <future> // Required for std::future and std::async
3#include <functional> // Required for std::function
4#include <queue> // Required for std::queue
5
6// Define a type for a function that returns a std::future of any type
7using AsyncFunction = std::function<std::future<void>()>;
8
9/**
10 * Executes a pool of std::future objects concurrently, but with a limited number of std::future
11 * objects running at the same time.
12 * @param async_functions - A vector of functions that return std::future objects.
13 * @param limit - The maximum number of std::future objects that can be running at the same time.
14 * @returns A std::future that resolves when all the given functions have completed.
15 */
16std::future<void> promise_pool(std::vector<AsyncFunction> async_functions, int limit) {
17 // A queue to hold the functions that are currently waiting to be executed.
18 std::queue<AsyncFunction> waiting_functions;
19
20 // Vector to keep track of futures that are currently running
21 std::vector<std::future<void>> running_futures;
22
23 // Iterate over async_functions and enqueue them into waiting_functions
24 for (auto& fn : async_functions) {
25 waiting_functions.push(fn);
26 }
27
28 // Function that checks and starts execution of waiting functions as slots become free.
29 auto start_waiting_functions = [&running_futures, &waiting_functions, limit]() {
30 while (!waiting_functions.empty() && running_futures.size() < static_cast<size_t>(limit)) {
31 auto function_to_run = waiting_functions.front();
32 waiting_functions.pop();
33 running_futures.push_back(function_to_run());
34 }
35 };
36
37 // Function that waits for all futures to complete
38 auto wait_all = [&running_futures]() -> void {
39 for (auto &f : running_futures) {
40 if(f.valid()) {
41 f.wait();
42 }
43 }
44 };
45
46 // Start functions until reaching the limit
47 start_waiting_functions();
48
49 // A promise for notifying completion of all async functions
50 std::promise<void> completion_promise;
51
52 // Setup async task to wait for all tasks to finish
53 auto completion_task = std::async(std::launch::async, [&]() {
54 while (!running_futures.empty()) {
55 for (auto it = running_futures.begin(); it != running_futures.end();) {
56 if(it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
57 it = running_futures.erase(it);
58 start_waiting_functions();
59 } else {
60 ++it;
61 }
62 }
63 }
64 completion_promise.set_value();
65 });
66
67 // Return the future associated with the completion promise
68 return completion_promise.get_future();
69}
70
1// Defines a type for a function that returns a Promise of any type.
2type AsyncFunction = () => Promise<any>;
3
4/**
5 * Executes a pool of Promises concurrently, but with a limited number of Promises running at the same time.
6 * @param asyncFunctions - An array of functions that return Promises.
7 * @param limit - The maximum number of Promises that can be running at the same time.
8 * @returns A Promise that resolves when all the given functions have completed.
9 */
10function promisePool(asyncFunctions: AsyncFunction[], limit: number): Promise<any[]> {
11 // An array to hold the functions that are currently waiting to be executed.
12 const waitingFunctions: AsyncFunction[] = [];
13
14 // Wraps each async function to manage the execution of the next function in the queue.
15 const wrappedFunctions = asyncFunctions.map(fn => async () => {
16 await fn();
17 const next = waitingFunctions.shift();
18 if (next) {
19 await next();
20 }
21 });
22
23 // Initialize two arrays: 'currentlyRunning' for functions that should start immediately, and
24 // 'waitingFunctions' for the rest that should wait for free slots.
25 const currentlyRunning = wrappedFunctions.slice(0, limit);
26 waitingFunctions.push(...wrappedFunctions.slice(limit));
27
28 // Trigger the execution of the currently running functions and use Promise.all() to wait until all have finished.
29 return Promise.all(currentlyRunning.map(fn => fn()));
30}
31
Time and Space Complexity
The time complexity of the promisePool
function is determined by the number of function executions (functions.length
) and the concurrency level (n
).
Since you have not provided the specifics of what each function in the functions
array does, if we assume that each function takes an equal amount of time to complete and that there is perfect parallelism with no additional overhead for managing the promises, the time complexity would be O(functions.length / n)
. This is because you run n
functions in parallel until all functions have been executed.
However, it should be noted that in practice, the actual time complexity can be affected by various factors, such as the nature of the I/O operations, the performance characteristics of the underlying system, and the overhead of promise scheduling and execution.
The space complexity of this code is O(functions.length)
because it creates a wrapping function for each function in functions
—stored in wrappers
. It also stores the actively running and waiting promises. Both running
and waiting
arrays store at most functions.length
wrapper functions, but they are derived from wrappers
, so there is no additional space used that scales with the input.
If we include auxiliary space for the execution context of asynchronous function calls, the space complexity might be higher due to the async calls and closures created for each function. This complexity can be considered as O(n)
because at most n
functions will be running at the same time, each needing its own execution context.
The final space complexity is thus the larger of O(functions.length)
and O(n)
, which is the space needed for the wrappers
array and the maximum concurrent async execution contexts.
A heap is a ...?
Recommended Readings
Patterns The Shortest Path Algorithm for Coding Interviews The goal of AlgoMonster is to help you get a job in the shortest amount of time possible in a data driven way We compiled datasets of tech interview problems and broke them down by patterns This way we can determine the
Recursion Recursion is one of the most important concepts in computer science Simply speaking recursion is the process of a function calling itself Using a real life analogy imagine a scenario where you invite your friends to lunch https algomonster s3 us east 2 amazonaws com recursion jpg You first
Runtime Overview When learning about algorithms and data structures you'll frequently encounter the term time complexity This concept is fundamental in computer science and offers insights into how long an algorithm takes to complete given a certain input size What is Time Complexity Time complexity represents the amount of time
Got a question? Ask the Monster Assistant anything you don't understand.
Still not clear? Submit the part you don't understand to our editors. Or join our Discord and ask the community.