sideeffect-io / AsyncExtensions

AsyncExtensions aims to mimic Swift Combine operators for async sequences.
MIT License
324 stars 26 forks source link

Youre missing async sequence builder like kotlin's #19

Open ursusursus opened 2 years ago

ursusursus commented 2 years ago

I'm a kotlin developer familiar with kotlin coroutines and Flow, which is direct mapping to async/await + AsyncSequence.

The most powerful thing that kotlin flow has is the builder, which allows you to emit values arbitrarily and use all the async stuff since the closure is async, like so (I'll write it in async/await syntax so you understand better)

flow { emitter in
   await emitter.emit(1)
   await Task.sleep(1_000)
   await emitter.emit(10)
   await Task.sleep(1_000)
   for 0...10 {
      await emitter.emit(100)
   }
}

when the closure returns, the flow terminates, if you throw inside the closure, then error is propagated as usual

It allow you to then create arbitrary custom operators, or simply way to pipe async function into AsyncSequence like

asyncSequence {
   await someFunction()
}
.flatMapLatest { ... }
.collect { ... }

this is very needed

hoc081098 commented 2 years ago

Swift has built-in function

AsyncThrowingStream { continuation in        

    Task {
       continuation.yield(data) }  
       await ...
       continuation.finish(throwing: nil)
       continuation.finish(throwing: error)
 }
ursusursus commented 2 years ago

Hm, but is that not leaky? The task will continue after callsite task gets canceled, no?

ursusursus commented 2 years ago

How does this look?

extension AsyncThrowingStream {
    public struct Emitter {
        let continuation: Continuation

        public func emit(_ value: Element) {
            continuation.yield(value)
        }
    }

    public init(body: @escaping (Emitter) async throws -> ()) where Failure == Error {
        self.init { continuation in
            let task = Task {
                do {
                    let emitter = Emitter(continuation: continuation)
                    try await body(emitter)
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
            continuation.onTermination = { @Sendable termination in
                switch termination {
                case .cancelled:
                    task.cancel()
                default:
                    break
                }
            }
        }
    }
}
let source = AsyncThrowingStream<String, Error> { emitter in
    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("A")

    try await Task.sleep(nanoseconds: 1_000_000_000)
    emitter.emit("B")
}
natario1 commented 2 years ago

I'd like something similar too. The implementation above looks good to me.