mattmassicotte / ConcurrencyRecipes

Practical solutions to problems with Swift Concurrency
BSD 3-Clause "New" or "Revised" License
1.07k stars 33 forks source link

Waiting for data from an independent task #1

Closed samalone closed 7 months ago

samalone commented 7 months ago

Here's a puzzle that I fought with for some time before coming up with a solution. I was working with Swift NIO and distributed actors, and needed to use an actor to allow one task to wait for a value set by a different task.

actor Registry {
    var value: String? = nil

    func setValue(_ s: String) async {
        value = s
    }

    // Wait until a task has set the value, and return that value.
    func requireValue() async -> String {
    }
}

The solution was to return the value immediately if it was available, and wait for a continuation if it was not.

actor Registry {
    private var value: String?
    private var waiters: [CheckedContinuation<String, Never>] = []

    func setValue(_ s: String) async {
        value = s
        for waiter in waiters {
            waiter.resume(returning: s)
        }
        waiters.removeAll()
    }

    func requireValue() async -> String {
        if let value {
            return value
        }
        else {
            return await withCheckedContinuation { continuation in
                waiters.append(continuation)
            }
        }
    }
}
mattmassicotte commented 7 months ago

This is a really, really great one!

This is kind of like an actor cache, which is a notoriously difficult to think to do. You are very much on the right track with your collection-of-continuations. Handling cancellation adds yet another difficult dimension. But I think this is really worth adding!

mattmassicotte commented 7 months ago

How about something like this:

// assumptions: cancellation support is not required, value is set externally

actor ValueStore<T: Sendable> {
    private var value: T?
    private var waiters: [CheckedContinuation<T, Never>] = []

    func setValue(_ newValue: T) async {
        value = newValue
        for waiter in waiters {
            waiter.resume(returning: newValue)
        }
        waiters.removeAll()
    }

    func getValue() async -> T {
        if let value {
            return value
        }

        return await withCheckedContinuation { continuation in
            waiters.append(continuation)
        }
    }
}
mgrider commented 7 months ago

I like this too.

One thing I think it's missing is a usage example. (I think probably every "recipe" should also include an example of usage as well as the recipe itself?)

mattmassicotte commented 7 months ago

This is a great point, and I do think that's valuable. I have used this only in the context of a cache. I think this could be easily adapted to fit that model, and I have to imagine comes up a lot.

You want to cache a value, and the value can only be computed asynchronously.

It's good too, because we can do solution 1 with a cached Task, which is a very common solution, but has major drawbacks. Solution 2 with tracking continuations like is done here. And solution 3 that both tracks continuations and handles per-request cancellation.

samalone commented 7 months ago

I think the generic version illustrates the technique more clearly, since it makes plain that there is nothing special about the String type in my example, and that the value type must be Sendable.

Creating a good example is tricky because with a concise example like...

let store = ValueStore<String>()

Task.detached {
   try await Task.sleep(for: .seconds(1))
   try await store->setValue("Alice")
}

print(await store->getValue())

...any sane coder would restructure the code to eliminate the race condition in the first place, rather than use an actor to act as a semaphore. The reason I needed the actor was that I needed to coordinate Swift NIO, which has its own event loop system, with Swift distributed actors, which can be called from an arbitrary task.

mattmassicotte commented 7 months ago

I've added a section that I think is coming closer to addressing this kind of problem: 34eb96d02dce3df1d97cc17969a47acdaca42bab

But I want to leave this open to add solutions 2 (a continuation collection) and 3 (a continuation collection that supports cancelation)

mattmassicotte commented 7 months ago

And now the tracking solution is posted: 6159f14481261677dc495ac039ef0c62c826e152

mattmassicotte commented 7 months ago

And finally, a full async cache implementation, with support for per-request cancellation. I realize this isn't exactly the same as what was originally proposed, but I think it still illustrates the core ideas and is a use-case I think will be familiar to a lot of people.