pointfreeco / swift-concurrency-extras

Useful, testable Swift concurrency.
MIT License
343 stars 23 forks source link

Lock introduced in eraseToStream can lead to missing sequence elements #19

Closed cwalo closed 11 months ago

cwalo commented 11 months ago

Description

I stumbled on this issue after observing some missing values when using eraseToStream(). It seems that a side effect of introducing the lock is that values can be dropped if the sequence produces elements while the initializer is holding the lock. Assuming I'm holding everything correctly in the example below, the current implementation will drop the first few values pretty consistently. This is not the case when using the AsyncPublisher produced by publisher.values or when manually calling continuation.yield().

I took a stab at this version which seems to be behave as expected and keeps things within the concurrency domain. That said, I'm not certain of the implications compared to the current implementation. Leveraging the initializer with a bufferingPolicy:

init<S: AsyncSequence>(_ sequence: S) where S.Element == Element {
    self.init { continuation in
        Task {
            do {
                for try await value in sequence {
                    continuation.yield(value)
                }
            } catch {
                continuation.finish()
            }
        }
    }
}

Checklist

Expected behavior

All sequence elements are received.

Actual behavior

Sequence elements are sometimes missing.

Steps to reproduce

class Observer<T> {
    init(_ publisher: some Publisher<T, Never>, usesEraseStream: Bool) {
        let stream: any AsyncSequence = usesEraseStream ? publisher.values.eraseToStream() : publisher.values

        Task {
            print("Receive Task started")
            for try await value in stream {
                print("receive: \(value)")
            }
        }
    }
}

let subject = PassthroughSubject<Int, Never>()
let observer = Observer<Int>(subject, usesEraseStream: true)

Task {
    print("Send Task started")
    for i in 0..<100 {
        subject.send(i)
    }
}

// expected - receive: 0 ... receive: 99
// result - receive: 5 ... receive: 99

swift-concurrency-extras version information

1.1.0

Destination operating system

macOS 14.1.2

Xcode version information

15.0.1

Swift Compiler version information

swift-driver version: 1.87.1 Apple Swift version 5.9 (swiftlang-5.9.0.128.108 clang-1500.0.40.1)
Target: arm64-apple-macosx14.0
stephencelis commented 11 months ago

@cwalo I think you and Brandon already discussed this on Slack, and I think the conclusion was that this is not a bug related to the lock, but a general behavior of Swift's async streams and suspension point thread hops.

Since I don't think it's a bug in the library I'm going to convert to a discussion, but if it does turn out that there are improvements we could make, we'd be happy to address!