2721. Execute Asynchronous Functions in Parallel


Problem Description

In this problem, we are given an array functions which contains a list of asynchronous functions. Each of these functions, when called, returns a promise. The task is to execute all these functions in parallel without using the built-in Promise.all function and return a new promise promise.

The new promise should:

  • Resolve when all of the promises returned from the functions in functions array have been resolved. The resolved value should be an array containing the resolved values from each promise, maintaining the same order as the original functions array.
  • Reject if any one of the promises returned from the functions array is rejected. The reason for rejection should be the same as the first rejection that occurs among the promises.

The challenge is to manage these asynchronous operations manually to ensure correct synchronization and error handling without relying on Promise.all, which simplifies the coordination of asynchronous promises.

Intuition

To arrive at a solution, let's consider the manual steps we would need to track and synchronize the various promises:

  1. We need to keep a count of how many promises have resolved so that we know when all promises have been settled. This can be done using a counter variable.
  2. We must store the results of each promise in an array, maintaining the order of how they appeared in the input array. When a promise resolves, we will save its result in the corresponding position in this array.
  3. If any promise rejects, we should immediately reject the entire set without waiting for other promises to finish. This requires us to have a rejection handler for each promise.
  4. As each promise resolves, we increment the counter. When the counter matches the number of functions, we know all promises have resolved, and we can resolve our promise with the array of results.
  5. If a promise rejects, we call the reject function with the error that was thrown. This rejection should happen only once.

Implementing this logic, we can construct a promiseAll function, which creates a new promise (return new Promise<T[]>) and performs the above-mentioned steps to resolve or reject based on the results of the promise executions.

Not Sure What to Study? Take the 2-min Quiz to Find Your Missing Piece:

Which of these properties could exist for a graph but not a tree?

Solution Approach

The implementation of the promiseAll function follows these steps:

  1. We initialize a Promise object using new Promise<T[]>((resolve, reject) => {...}). Within this Promise, we manage the execution and coordination of all promises returned by the functions.

  2. We declare a counter variable cnt initialized to zero, which we'll use to keep track of how many promises have been settled (resolved).

  3. We create an array ans that will hold the resolved values of the promises. Its length is the same as the functions array, ensuring we maintain the order of resolved values as per their corresponding functions.

  4. We loop over the array of functions using for (let i = 0; i < functions.length; ++i) and call each function, which returns a promise.

  5. For each returned promise:

    • We attach a .then(res => {}) handler that activates when the promise resolves. In this handler, we store the result as ans[i] = res and increment the cnt counter. We then check if cnt is equal to functions.length to determine if all promises have been resolved, in which case we call resolve(ans) to resolve the promiseAll function's promise with the array of results.
    • We also attach a .catch(err => {}) handler to handle any rejections. If a promise rejects, we call reject(err) immediately, forwarding the first error we encounter to the promiseAll function's promise, thus rejecting it.

This implementation makes use of closures to keep track of the index and result of each asynchronous operation, arrays to store the results, and promises' .then() and .catch() methods for asynchronous flow control.

The absence of a .catch() block after resolve(ans) is intentional as any error occurring in resolve (which should not happen in standard usage) should not be caught by this block, but be allowed to propagate naturally.

Below is an excerpt of the essential part of the implementation:

1const ans = new Array(functions.length);
2for (let i = 0; i < functions.length; ++i) {
3    functions[i]()
4        .then(res => {
5            ans[i] = res;
6            cnt++;
7            if (cnt === functions.length) {
8                resolve(ans);
9            }
10        })
11        .catch(reject);
12}

This approach effectively simulates the behavior of Promise.all without directly relying on it, demonstrating a fundamental understanding of concurrency and error handling with promises in JavaScript.

Discover Your Strengths and Weaknesses: Take Our 2-Minute Quiz to Tailor Your Study Plan:

What is the worst case running time for finding an element in a binary tree (not necessarily binary search tree) of size n?

Example Walkthrough

Suppose we have an array of three asynchronous functions that we want to execute in parallel. These functions are defined as follows:

  • function1() simulates an asynchronous operation that resolves after 1 second with the string "Result 1".
  • function2() simulates an asynchronous operation that resolves after 2 seconds with the string "Result 2".
  • function3() simulates an asynchronous operation that resolves after 3 seconds with the string "Result 3".

Using the given solution approach, we can execute these functions and manage their promises manually. Here's how the process would look step-by-step:

  1. Initialization:

    • We define a new Promise object promiseAll that will manage the execution of all functions.
    • Inside this new Promise, we initialize a counter cnt to zero and an array ans of the same length as the functions array to store the resolved values.
  2. Execution and Management:

    • We begin a for loop to execute each asynchronous function in the functions array.
    • For each function, we invoke it and receive a promise back.
  3. Resolution Handling:

    • We attach a .then(res => {}) handler to each promise. When function1() resolves after 1 second, the .then() handler for that function will execute, storing the result in ans[0] and incrementing cnt.
    • The same happens for function2() and function3(); each one's resolution is handled by the respective .then() handler, storing the result at the corresponding index in the ans array and incrementing cnt.
  4. Completion Check:

    • After each resolution, inside its .then() handler, we check whether cnt has reached the length of the functions array. When the last function (function3()) resolves and cnt becomes 3, we call resolve(ans) to resolve the promiseAll function's promise.
    • At this point, the promiseAll promise will successfully resolve with the array ["Result 1", "Result 2", "Result 3"].
  5. Error Handling:

    • Additionally, each promise has a .catch(err => {}) handler attached. If any one of them had rejected, the reject(err) function within the .catch() handler would execute, rejecting the promiseAll function's promise with the reason for the first rejection that occurred, and the process would stop without awaiting the resolution of other promises.

This example demonstrates how the provided solution approach coordinates multiple asynchronous operations and processes their results or errors as they occur, achieving the same behavior as Promise.all without using it directly.

Solution Implementation

1import asyncio
2from typing import Callable, List, TypeVar, Generic
3
4# Creating a type variable to represent the generic type
5T = TypeVar('T')
6
7async def promise_all(promise_fns: List[Callable[[], asyncio.Future[T]]]) -> asyncio.Future[List[T]]:
8    """
9    Executes an array of functions that each returns an asyncio coroutine (future), 
10    and resolves when all have successfully completed.
11    If any of the coroutine rejects, this function will reject with the reason from
12    the first coroutine that rejected.
13  
14    :param promise_fns: A list of functions that each returns a coroutine.
15    :return: A coroutine that resolves with a list of the resolved values from the coroutines or rejects.
16    """
17    results_array = []
18  
19    for promise_fn in promise_fns:
20        # We will collect all coroutine objects first to later await them concurrently
21        results_array.append(promise_fn())
22
23    # Now we await for all gathered coroutine objects concurrently
24    return await asyncio.gather(*results_array)
25
26# Example usage:
27# async def main():
28#     promises = [lambda: asyncio.Future().set_result(42)] # Creating a list of lambda functions returning coroutines
29#     try:
30#         results = await promise_all(promises)
31#         print(results)  # Expected output: [42]
32#     except Exception as e:
33#         print(f"An error occurred: {e}")
34
35# asyncio.run(main())
36
1import java.util.List;
2import java.util.concurrent.CompletableFuture;
3import java.util.concurrent.ExecutionException;
4import java.util.function.Supplier;
5import java.util.stream.Collectors;
6import java.util.stream.IntStream;
7
8public class PromiseAll {
9
10    /**
11     * Executes a list of suppliers that return CompletableFutures, and completes when all have successfully completed.
12     * If any of the CompletableFutures complete exceptionally, this function will complete exceptionally
13     * with the cause from the first future that completed exceptionally.
14     *
15     * @param promiseFns A list of suppliers that each returns a CompletableFuture.
16     * @param <T>        The result type returned by this CompletableFuture's suppliers.
17     * @return A CompletableFuture that, when completed normally, contains a list of the values obtained by 
18     *         each of the supplied CompletableFutures, or completes exceptionally if any of the provided 
19     *         CompletableFutures complete exceptionally.
20     */
21    public static <T> CompletableFuture<List<T>> promiseAll(List<Supplier<CompletableFuture<T>>> promiseFns) {
22        List<CompletableFuture<T>> futuresList = promiseFns.stream()
23                                                           .map(Supplier::get)
24                                                           .collect(Collectors.toList());
25      
26        CompletableFuture<Void> allDoneFuture =
27                CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
28
29        return allDoneFuture.thenApply(v ->
30                futuresList.stream()
31                           .map(CompletableFuture::join)
32                           .collect(Collectors.toList())
33        );
34    }
35
36    public static void main(String[] args) {
37        // Example usage:
38        List<Supplier<CompletableFuture<Integer>>> suppliers = List.of(
39                () -> CompletableFuture.completedFuture(42)
40        );
41      
42        CompletableFuture<List<Integer>> allFutures = promiseAll(suppliers);
43      
44        try {
45            List<Integer> resultList = allFutures.get(); // This blocks until the future completes
46            System.out.println(resultList); // Expected output: [42]
47        } catch (InterruptedException | ExecutionException e) {
48            e.printStackTrace(); // Handle the exception if any of the futures completed exceptionally
49        }
50    }
51}
52
1#include <vector>
2#include <future>
3#include <functional>
4#include <stdexcept>
5
6// Custom exception type to handle when an asynchronous operation fails
7struct AsyncOperationFailed : public std::runtime_error {
8    AsyncOperationFailed(const char* message) : std::runtime_error(message) {}
9};
10
11/**
12 * Executes a vector of functions that returns futures, and gets the results when all have successfully completed.
13 * If any of the futures report a failure, this function throws an exception with the reason from the first failed one.
14 * 
15 * @tparam T The type of the results that the futures are expected to return.
16 * @param futureFns A vector of functions that each returns a future of type T.
17 * @return A vector of the resolved values from the futures.
18 */
19template<typename T>
20std::vector<T> promiseAll(std::vector<std::function<std::future<T>()>> futureFns) {
21    // Vector to store the futures returned by the input functions
22    std::vector<std::future<T>> futures;
23
24    // Reserve the space needed in the vector to store the futures
25    futures.reserve(futureFns.size());
26
27    // Execute each function to get the future and add it to the vector
28    for (auto& fn : futureFns) {
29        futures.push_back(fn());
30    }
31
32    // Vector to accumulate the results from the futures
33    std::vector<T> results;
34    results.reserve(futureFns.size());
35
36    // Iterate through the futures and get their results
37    for (auto& fut : futures) {
38        try {
39            // Get the result from the future and add it to the results vector
40            results.push_back(fut.get());
41        } catch (...) {
42            // If getting the result throws, forward the exception
43            throw AsyncOperationFailed("One of the asynchronous operations failed.");
44        }
45    }
46
47    // Return the full vector of results when all futures have been processed
48    return results;
49}
50
51// Example usage (within a function that allows exceptions):
52/*
53try {
54    auto results = promiseAll<int>({
55        []() { return std::async(std::launch::async, []() { return 42; }); }
56    });
57    // Process results here
58} catch (const AsyncOperationFailed& e) {
59    // Handle the failure case here
60}
61*/
62
1/**
2 * Executes an array of functions that returns Promises, and resolves when all have successfully completed.
3 * If any of the Promises reject, this function will reject with the reason from the first promise that rejected.
4 * 
5 * @param {(() => Promise<T>)[]} promiseFns - An array of functions that each returns a promise.
6 * @returns {Promise<T[]>} A promise that resolves with an array of the resolved values from the input promises or rejects.
7 */
8async function promiseAll<T>(promiseFns: (() => Promise<T>)[]): Promise<T[]> {
9    return new Promise<T[]>((resolve, reject) => {
10        // Counter for promises that have been resolved
11        let resolvedCount = 0;
12        // Array to accumulate the resolved results. Its length is pre-set to match the array of promises
13        const resultsArray = new Array<T>(promiseFns.length);
14
15        // Iterate over the promise functions
16        promiseFns.forEach((promiseFn, index) => {
17            // Execute each promise function
18            promiseFn()
19                .then(value => {
20                    // Save the resolved value in the array
21                    resultsArray[index] = value;
22                    resolvedCount++;
23                    // If all promises have resolved, resolve the outer promise with the results array
24                    if (resolvedCount === promiseFns.length) {
25                        resolve(resultsArray);
26                    }
27                })
28                .catch(error => {
29                    // If any of the promises reject, reject the outer promise with the error
30                    reject(error);
31                });
32        });
33    });
34}
35
36// Example usage:
37// const promise = promiseAll([() => Promise.resolve(42)]);
38// promise.then(console.log).catch(console.error); // Expected output: [42]
39
Not Sure What to Study? Take the 2-min Quiz:

How many ways can you arrange the three letters A, B and C?

Time and Space Complexity

The given promiseAll TypeScript code aims to mimic the behavior of Promise.all, running multiple promises in parallel and gathering their results. Now let's analyze its complexities:

Time Complexity

For time complexity, the promiseAll function operates by initiating all the promises nearly simultaneously (though there is a minuscule amount of time spent in the loop) and then handling their results as they resolve. The promise that takes the longest to resolve dictates the overall time complexity. If we assume that the slowest promise in the array takes O(n) time, then the promiseAll function would also have a time complexity of O(n). However, if we consider the loop's contribution, since there is a one-time initialization for each promise, this is effectively O(1) for each promise leading to O(m) for m promises, which could be considered as part of the setup process. But this does not affect the overall time complexity in terms of waiting for promises to resolve, which will still be dominated by the promise that takes the longest time. Therefore,

  • Time Complexity: O(n)

where n is the time taken by the slowest individual promise to resolve.

Space Complexity

Space complexity pertains to the extra space or storage needed by the algorithm as the input size grows. In this case, the promiseAll function creates an array ans of size equal to the number of promises to store their results. Assuming each result takes O(1) space, the total space required is proportional to the number of promises. Hence,

  • Space Complexity: O(m)

where m is the number of promises. We do not consider the space taken by the input itself or the promises' internal space usage; we only consider the extra space that the promiseAll function allocates, which is the ans array and the cnt variable, which is a single counter.

It's important to note, however, that the actual time a promise takes to resolve would be determined by the systems or processes it depends on (e.g., I/O operations, network requests), and the complexity O(n) represents a simplification for the time any single promise takes to resolve in the worst-case scenario. Since all promises are processed in parallel, the overall time until all promises resolve would depend on the one that takes the longest.

Fast Track Your Learning with Our Quick Skills Quiz:

What is the running time of the following code?

1int sqrt(int n) {
2  for (int guess = 1; guess * guess <= n; guess++) {
3    if (guess * guess == n) {
4      return guess;
5    }
6  }
7  return -1;
8}

Recommended Readings


Got a question? Ask the Teaching Assistant anything you don't understand.

Still not clear? Ask in the Forum,  Discord or Submit the part you don't understand to our editors.


TA 👨‍🏫