apple / swift-migration-guide

Apache License 2.0
271 stars 18 forks source link

Cancellation #25

Open mattmassicotte opened 1 month ago

mattmassicotte commented 1 month ago

I was starting to think a bit about cancellation and withTaskCancellationHandler. I'm not certain exactly which section this belongs in, but I think it is an important topic.

GangWoon commented 1 month ago

Hello!

It seems like many people miss how cancellation works in structured concurrency. Therefore, I believe it's essential to know how to use cancellation properly. I noticed in Apple's example that cancellation wasn't correctly handled in the context of structured concurrency, which caused tasks not to be canceled properly.

Is there a way I can contribute to addressing this issue? If you can guide me, I would be happy to prepare and contribute. :)


If you check the last code in [Section2 - Step 8], even if the task running the quakeLocation asynchronous method is canceled, the task created inside the method is not canceled. This is because it is not using structured concurrency. To ensure cancellation as mentioned above, you need to wrap it with withTaskCancellationHandler. Tutorial Link

mattmassicotte commented 1 month ago

Thanks you for pointing this out! I was just looking at the tutorial example. Cancellation of caching behaviors is tricky! I'm including the code listing here just for reference:

func quakeLocation(from url: URL) async throws -> QuakeLocation {
    if let cached = quakeCache[url] {
        switch cached {
        case .ready(let location):
            return location
        case .inProgress(let task):
            return try await task.value
        }
    }
    let task = Task<QuakeLocation, Error> {
        let data = try await downloader.httpData(from: url)
        let location = try decoder.decode(QuakeLocation.self, from: data)
        return location
    }
    quakeCache[url] = .inProgress(task)
    do {
        let location = try await task.value
        quakeCache[url] = .ready(location)
        return location
    } catch {
        quakeCache[url] = nil
        throw error
    }
}

Consider this:

A) client 1 calls quakeLocation(from url: URL), creating the underlying Task B) client 2 calls quakeLocation(from url: URL) C) client 3 calls quakeLocation(from url: URL) D) client 1 cancels

At this point, if QuakeClient cancelled the underlying Task, I believe client 2 and 3 would also end up being cancelled. I have been referring to this as the "special first request problem". In my opinion the existing solution may actually be optimal, behaviorally. I'm not sure you want the first request to be special.

What do you think @hborla, @ktoso?

FranzBusch commented 1 month ago

So I think we need to separate documentation around cancellation into two sections:

  1. How does it work in structured concurrency using task group and async lets
  2. How does it work with unstructured concurrency e.g. Task.init and Task.detached? In particular how do you cancel a Task and calling out that this is a big reason to avoid using unstructured concurrency

The above Quake example and your first request problem is IMO a usage pattern for caching which deserves its own article and has nothing to do with cancellation per-se. The bigger question here is rather how to handle cancellation in when trying to cache. IMO the best solution here is again to avoid spawning any unstructured task but to invert control.

actor Cache<Request, Response> {
  enum RequestState {
    case response(Response)
    case loading([CheckedContinuation<Response>])
  }
  private var cache: [UUID: RequestState]
  private let streamContinuation: AsyncStream<(Request, CheckedContinuation<Response>), Never>.Continuation
  private let stream: AsyncStream<(Request, CheckedContinuation<Response>), Never>

  func run() async {
    await withDiscardingTaskGroup { group in
      for await (request, continuation) in self.stream {
        self.cache[request.id] = .loading([continuation])
        group.addTask {
          let response = await request.execute()
          self.receivedResponse(for: request.id, response)
        }
      }
    }
  }

  func receivedResponse(for id: UUID, response: Response {
    // Resume all continuations from the cache
  }

  func executeRequest(_ request: Request) async -> Response {
    switch self.cache[id] {
    case .response(let response):
      return response
    case .loading(var continuations):
      await withTaskCancellationHandler {
        await withCheckedContinuation { continuation in
          continuations.append(continuation)
          self.cache[id] = .loading(continuations)
       }
      } onCancel: {
        self.removeContinuation() // We need to give each continuation an ID by having a counter in the actor
      }
    case .none:
      // Create continuation and yield to stream
    }
  }

The above is just some pseudocode but it dodges the first request problem that you describe by just applying structured concurrency and inverting control. I think the inverting control part is often an important design pattern that helps keep the structure in the code.

mattmassicotte commented 1 month ago

@FranzBusch, this is great!

Do you have any thoughts about how to discuss the complexity/functionality trade-offs here? Do you think the QuakeClient should be changed?

GangWoon commented 1 month ago

@FranzBusch Thank you so much for the pseudocode. It's an approach I hadn't thought of, but it seems really useful. I have an additional question about implementing the code in practice.

When calling self.removeContinuation in the onCancel parameter, I encounter an "Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context" error. Wrapping it with Task.init avoids the compile error, but that doesn't seem like the correct approach. Could you give me some more hints about this code?

  func executeRequest(_ request: Request) async -> Response {
    switch self.cache[id] {
    case .loading(var continuations):
      await withTaskCancellationHandler {
       ...
      } onCancel: {
        self.removeContinuation() <- ‼️ Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context
      }
FranzBusch commented 1 month ago

Do you have any thoughts about how to discuss the complexity/functionality trade-offs here? Do you think the QuakeClient should be changed?

I think we need to explain the underlying concepts first i.e. structuredness and cancellation. With good explanation around why structuredness is better. Then we can use this cache example as a more advanced setup of how to use structuredness.

When calling self.removeContinuation in the onCancel parameter, I encounter an "Call to actor-isolated instance method 'removeContinuation()' in a synchronous nonisolated context" error. Wrapping it with Task.init avoids the compile error, but that doesn't seem like the correct approach. Could you give me some more hints about this code?

Yes that's a correct error. The one way to work around this with Cache being an actor is to again spawn an unstructured task. This shows that we probably hit the limit of what we can do with an actor here and it might be time to switch to a class and use Mutex instead to protect our state. This way we don't need an async call to remove the continuation from onCancel.