2650. Design Cancellable Function


Problem Description

In this problem, we are tasked with creating a cancellable asynchronous process. We need to write a function called cancellable that accepts a generator object. The generator is expected to yield promises, representing asynchronous operations. The function cancellable should return an array of two values: a cancel function and a promise that corresponds to the asynchronous operations of the generator.

Here's the behavior we need to implement:

  1. Handling Promises: The function will consume the yielded promises from the generator, passing the values resolved by the promises back to the generator. If a promise rejects, the function must throw the error back to the generator.

  2. Cancellation: The cancel function, when called, should stop the generator from running further and throw a specific error ("Cancelled") back into the generator. If the generator catches this error, the outer promise (returned by cancellable) should resolve with the next value from the generator, if available. If it doesn't catch the error, the promise should reject with the thrown error.

  3. Completion: If the generator completes its process successfully without being cancelled, the promise should resolve with the final returned value from the generator. On the flip side, if the generator throws an error, the returned promise should reject with that error.

As an example, when using the provided tasks generator function, if the cancel function is called before the generator completes, it results in the promise rejecting with "Cancelled". Otherwise, if cancel is not called or called after the generator completes its process, the promise should resolve with the calculated value.

Intuition

The solution to the problem necessitates an understanding of JavaScript ES6 Generators, Promises, and the async/await syntax for handling asynchronous code execution.

The intuition is to create a control mechanism (the cancel function) that competes with the regular execution of the generator. When activated, the cancel function should reject an internal promise (cancelPromise), which interrupts the generator's regular flow.

  1. Wrapping the Generator with Async Function: To manage the asynchronous flow, we wrap the generator logic inside an async function, which allows us to use the await keyword to wait for asynchronous operations (Promises) to resolve.

  2. Controlling the Flow: The cancel function rejects the cancelPromise, and we use Promise.race to listen to both the current promise from the generator and the cancelPromise. Whichever promise settles first determines the next course of action:

    • If the promise from the generator settles first, we use its value to continue the generator.
    • If the cancelPromise settles first due to the cancel function being called, the generator is interrupted by throwing the "Cancelled" error into it.
  3. Error Handling: By employing try-catch blocks, we can handle errors from promises and the generator throwing an error (due to throw(e)). This helps us manage promise rejections and generator errors consistently.

  4. Return Value and Promise: Finally, we construct and return an array containing the cancel function and a promise that ultimately resolves to the generator's outcome or rejects if an error is thrown or cancellation occurs.

Not Sure What to Study? Take the 2-min Quiz to Find Your Missing Piece:

Suppose k is a very large integer(2^64). Which of the following is the largest as n grows to infinity?

Solution Approach

The solution implementation details are as follows:

  1. Creating Control Mechanisms:

    • A cancel function is declared and later defined within a new Promise called cancelPromise, which only gets rejected when the cancel function is called.
    • The assignment cancel = () => reject('Cancelled') effectively links the calling of the cancel function to the rejection of cancelPromise.
    • A catch is attached to the cancelPromise (cancelPromise.catch(() => {});) to prevent unhandled promise rejections in case it gets rejected before being passed to Promise.race.
  2. Handling Generator Execution Flow:

    • The main promise, promise, is created using an immediately invoked async function to handle the asynchronously yielded promises from the generator.
    • A while loop (while (!next.done)) is used to continuously pull values from the generator using next = generator.next(await Promise.race([next.value, cancelPromise]));.
    • Within the loop, Promise.race is used to wait for either the current yielded promise (next.value) or the cancelPromise to settle.
    • If cancelPromise wins the race (when the cancel function is called), the error "Cancelled" is thrown into the generator using generator.throw(e).
  3. Error Handling:

    • The try-catch block handles errors that may arise from promises. If a promise is rejected, await will throw an error, which is then caught and thrown back into the generator using generator.throw(e).
    • If the generator catches this error and continues to yield another value, the loop continues. If the generator doesn't catch the error, the next call to generator.next() will result in done being true, and the loop will end.
    • After the generator is done (next.done is true), the value it returned (next.value) is returned as the fulfillment value of the promise.
  4. Returning the Output:

    • The function cancellable finally returns an array with the cancel function and the promise.
    • The consumer can use the cancel function to cancel the ongoing generator execution at any point, and the promise to await the result of the generator execution (either resulting from normal completion, cancellation, or an error).

This solution cleverly uses JavaScript's concurrency tools, such as Promises and async/await syntax, alongside generator functions, to create a cancellable asynchronous pattern. The use of Promise.race is particularly noteworthy, as it provides a way to handle the cancellation in a non-blocking fashion alongside normal promise resolution.

Discover Your Strengths and Weaknesses: Take Our 2-Minute Quiz to Tailor Your Study Plan:

How does merge sort divide the problem into subproblems?

Example Walkthrough

Let's take a closer look at how the cancellable function can work using a simple example. Consider we have an asynchronous task that waits for a specified delay and then returns a number. We'll define a generator function that performs a sequence of these tasks.

1// Asynchronous task that resolves after a delay
2function delayedTask(value, delay) {
3  return new Promise(resolve => setTimeout(() => resolve(value), delay));
4}
5
6// Generator function that will yield promises from the delayedTask
7function* tasks() {
8  let result = 0;
9
10  // Wait for 500ms, then add 10
11  result += yield delayedTask(10, 500);
12  // Wait for another 500ms, then add 20
13  result += yield delayedTask(20, 500);
14  // Wait for another 500ms, then add 30
15  result += yield delayedTask(30, 500);
16
17  // Return the accumulated result
18  return result;
19}

Now we will use the cancellable function with this generator. We'll demonstrate what happens when the process completes without cancellation and what occurs when we do cancel the process partway through.

1// Executing the cancellable function with our generator
2const [cancel, promise] = cancellable(tasks());
3
4promise.then(result => {
5  console.log(`Completed with the result: ${result}`);
6}).catch(error => {
7  console.log(`The process was cancelled or rejected with an error: ${error}`);
8});

If we let the promise run to completion without invoking cancel, the console output will be:

1Completed with the result: 60

This output means the generator successfully went through all async tasks, yielding the promises and adding up 10 + 20 + 30 to give us the final result of 60.

Now, let's see what happens if we call cancel() after the first task:

1// Start the process
2const [cancel, promise] = cancellable(tasks());
3
4// Schedule a cancellation after the first delay
5setTimeout(() => {
6  cancel();
7}, 250);
8
9// Listen to the outcome
10promise.then(result => {
11  console.log(`Completed with the result: ${result}`);
12}).catch(error => {
13  console.log(`The process was cancelled or rejected with an error: ${error}`);
14});

With this setup, we've introduced a delay before the second await in the generator could complete. The cancel function is called, the cancelPromise wins the race, "Cancelled" is thrown into the generator, and it results in the following console output:

1The process was cancelled or rejected with an error: Cancelled

This demonstrates the cancellation logic, stopping the generator in its tracks and bypassing the completion of further asynchronous tasks. The above examples should help illustrate the behavior of the cancellable function in handling asynchronous generator processes and providing a mechanism for cancellation.

Solution Implementation

1import asyncio
2
3# This function takes in an asynchronous generator which yields awaitable objects
4# and returns a cancellation function and a coro (coroutine) promise.
5async def cancellable(generator):
6    # Declare a cancel event to signal cancellation.
7    cancel_event = asyncio.Event()
8
9    # Define the cancellation function which sets the event.
10    def cancel():
11        cancel_event.set()
12
13    # Define the wrapped coroutine that waits for the tasks or cancellation.
14    async def wrapped_coro():
15        # Create an asynchronous iterator from the generator.
16        async_gen = generator()
17
18        try:
19            # Iterate over the asynchronous generator.
20            while True:
21                try:
22                    # Get the next awaitable using __anext__
23                    next_coro = await async_gen.__anext__()
24                    # Wait for either the next_coro to finish or cancellation event to be set.
25                    done, pending = await asyncio.wait({cancel_event.wait(), next_coro},
26                                                       return_when=asyncio.FIRST_COMPLETED)
27                    # If the cancellation event is done, cancel the pending tasks and stop the generator.
28                    if cancel_event.is_set():
29                        for task in pending:
30                            task.cancel()
31                        raise asyncio.CancelledError
32                    # Send the resolved value back into the generator.
33                    result = next(done).result()  # Get the result of the finished coroutine.
34                    await async_gen.asend(result)
35                except StopAsyncIteration as e:
36                    return e.value  # Return the final value from the generator if there is one.
37        except asyncio.CancelledError:
38            # If cancelled, propagate the cancellation to all other parts of the code.
39            raise
40
41    # Wrap the coroutine in a task so it can be executed and cancelled.
42    coro_promise = asyncio.ensure_future(wrapped_coro())
43
44    # Return the cancellation function and the coroutine wrapped in a task.
45    return (cancel, coro_promise)
46
47# Usage example with an asynchronous generator function `tasks` that performs asynchronous operations.
48async def tasks():
49    # The asynchronous generator yields awaitables (asyncio.sleep here is used as an example).
50    val = yield asyncio.sleep(1, result=2 + 2)
51    await asyncio.sleep(2)  # Represents a delay or long running task.
52    return val + 1
53
54# To use the cancellable function, we would do the following.
55# Given the asynchronous nature we must run this in an event loop.
56
57async def main():
58    # Create a cancellable coroutine from the tasks generator and set up a cancellation.
59    cancel, wrapped_promise = cancellable(tasks)
60
61    # Cancel the operation after 0.05 seconds.
62    asyncio.get_running_loop().call_later(0.05, cancel)
63
64    try:
65        # Await the coroutine promise, and potentially catch the cancellation.
66        result = await wrapped_promise
67        print(result)
68    except asyncio.CancelledError:
69        # Handle the cancellation of the wrapped promise.
70        print("Cancelled")
71
72# Run the example
73asyncio.run(main())
74
1import java.util.concurrent.*;
2import java.util.function.Supplier;
3
4// This class encapsulates the cancellation logic and the handling of the asynchronous tasks.
5public class Cancellable<T> {
6    // A Future representing the asynchronous operation.
7    private Future<T> future;
8    // An ExecutorService to manage threads.
9    private ExecutorService executor;
10
11    // Constructor takes an instance of Supplier<Future<T>> which is effectively a generator for asynchronous tasks.
12    public Cancellable(Supplier<Future<T>> generator) {
13        // Use a single thread executor to run the tasks in sequence.
14        executor = Executors.newSingleThreadExecutor();
15        // Initiate the async control logic.
16        future = handleGenerator(generator);
17    }
18
19    // Handles the generator logic
20    private Future<T> handleGenerator(Supplier<Future<T>> generator) {
21        CompletableFuture<T> resultPromise = new CompletableFuture<>();
22        // Submit the generator handling task to the ExecutorService.
23        executor.submit(() -> {
24            try {
25                // Get the next task from the generator
26                Future<T> task = generator.get();
27                // Loop until there are no more tasks
28                while (task != null && !resultPromise.isCancelled()) {
29                    try {
30                        // Wait for the task to complete and get the result
31                        T result = task.get();
32                        resultPromise.complete(result); // Complete the result promise with the task result.
33                        task = generator.get(); // Get the next task from the generator.
34                    } catch (InterruptedException | ExecutionException e) {
35                        // If an error occurs, complete the result promise exceptionally.
36                        resultPromise.completeExceptionally(e.getCause());
37                    }
38                }
39            } finally {
40                // Shutdown the executor service after executing all tasks or after a cancellation.
41                executor.shutdownNow();
42            }
43        });
44        return resultPromise;
45    }
46
47    // Cancels the execution of the generator.
48    public void cancel() {
49        // Cancel the future and interrupt the execution if it's running.
50        future.cancel(true);
51    }
52
53    // Returns the future that completes with the final result or cancellation of the tasks.
54    public Future<T> getFuture() {
55        return future;
56    }
57}
58
59/**
60 * Usage example with a Supplier<Future<Integer>> `tasksGenerator`
61 * that simulates asynchronous operations.
62 */
63/*
64Supplier<Future<Integer>> tasksGenerator = new Supplier<Future<Integer>>() {
65    private int state = 0;
66
67    @Override
68    public Future<Integer> get() {
69        if (state == 0) {
70            state++;
71            CompletableFuture<Integer> future = new CompletableFuture<>();
72            future.complete(2 + 2);
73            return future;
74        } else if (state == 1) {
75            state++;
76            CompletableFuture<Integer> future = new CompletableFuture<>();
77            Executors.newScheduledThreadPool(1).schedule(() -> future.complete(null), 100, TimeUnit.MILLISECONDS);
78            return future;
79        } else {
80            return null;
81        }
82    }
83};
84
85// Create the Cancellable instance from the tasks generator.
86Cancellable<Integer> cancellable = new Cancellable<>(tasksGenerator);
87
88// Schedule cancellation of the operation after 50 milliseconds.
89Executors.newScheduledThreadPool(1).schedule(() -> cancellable.cancel(), 50, TimeUnit.MILLISECONDS);
90
91// Add a listener for the completion or cancellation of the wrapped future.
92cancellable.getFuture().whenComplete((result, throwable) -> {
93    // This will either print the result or the exception message.
94    if(throwable != null) {
95        System.out.println(throwable.getMessage());
96    } else {
97        System.out.println(result);
98    }
99});
100*/
101
1#include <iostream>
2#include <exception>
3#include <future>
4#include <thread>
5#include <functional>
6
7// Custom exception to indicate a cancelled operation
8struct CancelledException : public std::exception {
9    const char* what() const throw() {
10        return "Cancelled";
11    }
12};
13
14// This function runs a sequence of tasks asynchronously and returns a function to cancel the tasks
15template<typename T>
16std::pair<std::function<void()>, std::future<T>> cancellable(const std::function<T()>& tasks) {
17    // Create a promise for cancellation
18    std::promise<void> cancelPromise;
19    auto cancelFuture = cancelPromise.get_future();
20
21    // Create a promise for the overall result of the tasks
22    std::promise<T> resultPromise;
23
24    // Start the tasks in a separate thread
25    auto taskThread = std::thread([&tasks, &cancelFuture, &resultPromise]() {
26        try {
27            // Wait on the cancellation future with a zero timeout to detect if cancellation was requested
28            if(cancelFuture.wait_for(std::chrono::seconds(0)) == std::future_status::timeout) {
29                // If not cancelled, run the tasks
30                T result = tasks();
31                resultPromise.set_value(result);
32            } else {
33                // If cancellation was detected, set the promise as cancelled
34                throw CancelledException();
35            }
36        } catch(...) {
37            // Any exception including cancellation, set the promise as exception
38            resultPromise.set_exception(std::current_exception());
39        }
40    });
41
42    // Detach the thread to allow independent execution
43    taskThread.detach();
44
45    // Return a function to cancel the tasks and the future containing the result
46    return {
47        // Cancel function
48        [&cancelPromise]() {
49            cancelPromise.set_value();
50        },
51        // Future that will hold the result
52        resultPromise.get_future()
53    };
54}
55
56// Usage example
57/*
58int main() {
59    // Example task which waits then performs a computation
60    auto tasks = []() -> int {
61        std::this_thread::sleep_for(std::chrono::seconds(2));
62        return 2 + 2 + 1;
63    };
64
65    // Create a cancellable task sequence
66    auto [cancel, resultFuture] = cancellable<int>(tasks);
67
68    // Cancel the operation after 1 second
69    std::this_thread::sleep_for(std::chrono::seconds(1));
70    cancel(); 
71
72    // Wait for the cancellation or the task completion
73    try {
74        int result = resultFuture.get(); // This will throw an exception if the task was cancelled
75        std::cout << "Result: " << result << std::endl;
76    } catch(const CancelledException& e) {
77        std::cout << e.what() << std::endl;
78    } catch(const std::exception& e) {
79        std::cout << "Exception occurred: " << e.what() << std::endl;
80    }
81
82    return 0;
83}
84*/
85
1// This function takes in a generator which yields promises and returns a cancellation function and a promise.
2// When the cancellation function is called, it stops the generator from continuing.
3function cancellable<T>(generator: Generator<Promise<any>, T, unknown>): [() => void, Promise<T>] {
4    // Declare a cancel function that does nothing by default.
5    let cancel: () => void = () => {};
6  
7    // Create a cancel promise that will be rejected when the cancel function is called.
8    const cancelPromise = new Promise((_, reject) => {
9        cancel = () => reject(new Error('Cancelled'));
10    });
11  
12    // Catch any cancellation to prevent unhandled promise rejections.
13    cancelPromise.catch(() => {});
14
15    // This async function controls the execution of the generator, allowing for cancellation.
16    const wrappedPromise = (async () => {
17        let iterationResult = generator.next();
18      
19        // Loop until the generator is finished, handling each promise yielded.
20        while (!iterationResult.done) {
21            try {
22                // Wait for either the promise to resolve or the cancel promise to reject.
23                const resolvedValue = await Promise.race([iterationResult.value, cancelPromise]);
24                // Pass the resolved value back into the generator and get the next result.
25                iterationResult = generator.next(resolvedValue);
26            } catch (error) {
27                // If an error occurs, throw it back into the generator to be handled there.
28                iterationResult = generator.throw(error);
29            }
30        }
31        // Once done, return the final value from the generator.
32        return iterationResult.value;
33    })();
34
35    // Return the cancel function and the wrapped promise.
36    return [cancel, wrappedPromise];
37}
38
39/**
40 * Usage example with a generator function `tasks` that performs asynchronous operations.
41 */
42/*
43function* tasks() {
44    const val = yield new Promise(resolve => resolve(2 + 2));
45    yield new Promise(resolve => setTimeout(resolve, 100));
46    return val + 1;
47}
48
49// Create a cancellable promise from the tasks generator and set up a cancellation.
50const [cancel, wrappedPromise] = cancellable(tasks());
51
52// Cancel the operation after 50 milliseconds.
53setTimeout(cancel, 50);
54
55// Listen for the cancellation or the promise to be fulfilled.
56wrappedPromise.catch(console.log); // This will log "Cancelled" to the console at t=50ms due to the cancellation.
57*/
58
Not Sure What to Study? Take the 2-min Quiz:

A person thinks of a number between 1 and 1000. You may ask any number questions to them, provided that the question can be answered with either "yes" or "no".

What is the minimum number of questions you needed to ask so that you are guaranteed to know the number that the person is thinking?

Time and Space Complexity

Time Complexity

The time complexity of the cancellable function primarily depends on the number of times the generator yields, the time complexity of each promise resolved within the generator, and the overhead of the operations within the async function that drives the generator. The generator is advanced sequentially, yielding promises that are awaited one after the other.

Assuming the generator yields n promises, and the time complexity to resolve each promise is O(p), then the time complexity of running through the generator is O(n * p). Since we do not control the promises the generator yields, we cannot calculate p for arbitrary promises.

The Promise.race call has a very small overhead, O(1) with respect to time complexity, it simply waits for the first promise to resolve or reject.

Lastly, the try-catch block introduces a constant factor overhead but does not change the asymptotic time complexity.

Thus the total time complexity of the cancellable function is O(n * p).

Space Complexity

The space complexity is determined by the number of promises that can be in memory simultaneously, as well as the memory consumed by the generator's state space.

  1. For the generator itself, the space complexity will be O(g), where g captures the local state that is stored across yield points.

  2. The Promise.race call does not add any space complexity because it does not create new promises but merely operates on the existing promises referenced by next.value and cancelPromise.

  3. The cancelPromise and promise are individual promises and thus represent a constant space overhead O(1).

  4. 'try-catch' block - does not increase space complexity.

As a result, the space complexity is O(g), where g reflects the space needed to store the state of the generator between yields.

Fast Track Your Learning with Our Quick Skills Quiz:

Depth first search can be used to find whether two components in a graph are connected.


Recommended Readings


Got a question? Ask the Teaching Assistant anything you don't understand.

Still not clear? Ask in the Forum,  Discord or Submit the part you don't understand to our editors.


TA 👨‍🏫