tcldr / Entwine

Testing tools and utilities for Apple's Combine framework.
MIT License
445 stars 26 forks source link

Feature request: proper switchToLatest and switchMap operators workaround #16

Closed sherlock1982 closed 4 years ago

sherlock1982 commented 4 years ago

Hi!

Consider the following code:

cancellable = Just(2).map { x in
    Just(x * x).delay(for: 2.0, scheduler: RunLoop.main)
}
.switchToLatest()
.sink(receiveCompletion: {_ in
    print("completed")
}, receiveValue: {result in
    print(result)
})

In this example I tried to mimic switchMap operator with map+switchToLatest and surprisingly none of sink callbacks will be ever called. This is a bug in Combine I believe.

Looks like switchToLatest cancels immediately when upstream completes and doesn't wait for inner subscription to complete.

Surprisingly the same example works with a flatMap.

I suggest to write a proper switchToLatest operator and switchMap as well. Here's a code I created today for you to consider. Maybe there's an easier solution?

extension Publisher where Self.Output : Publisher, Self.Output.Failure == Failure {
    func switchToLatestWaitable() -> Publishers.SwitchToLatestWaitable<Self> {
        Publishers.SwitchToLatestWaitable(upstream: self)
    }
}

extension Publishers {

    public struct SwitchToLatestWaitable<Upstream: Publisher>: Publisher where Upstream.Output : Publisher, Upstream.Output.Failure == Upstream.Failure {
        public typealias Output = Upstream.Output.Output
        public typealias Failure = Upstream.Failure

        private let upstream: Upstream

        init(upstream: Upstream) {
            self.upstream = upstream
        }

        public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            upstream.receive(subscriber: SwitchToLatestWaitableSubscriber(subscriber))
        }

        class SwitchToLatestWaitableSubscriber<Downstream: Subscriber>: Subscriber where Downstream.Input == Upstream.Output.Output, Downstream.Failure == Upstream.Failure {
            public typealias Input = Upstream.Output
            public typealias Failure = Upstream.Failure

            private var downstream: Downstream
            private var upstreamCompleted = false
            private var downstreamCompleted = false
            private var innerCancellable: AnyCancellable? = nil

            init(_ downstream: Downstream){
                self.downstream = downstream
            }

            func receive(subscription: Subscription) {
                downstream.receive(subscription: subscription)
            }

            func receive(_ input: Input) -> Subscribers.Demand {
                innerCancellable = input.sink(receiveCompletion: {completion in
                    lock(obj: self) {
                        switch completion {
                        case .finished:
                            if self.upstreamCompleted {
                                self.complete(completion)
                            }
                        case .failure:
                            self.complete(completion)
                        }
                    }
                    }, receiveValue: {input in
                        lock(obj: self) {
                            _ = self.downstream.receive(input)
                        }
                })

                return .unlimited
            }

            func receive(completion: Subscribers.Completion<Failure>) {
                lock(obj: self){
                    switch completion {
                    case .finished:
                        upstreamCompleted = true

                    case .failure:
                        // Immediately pass failure
                        complete(completion)
                    }
                }
            }

            private func complete(_ completion: Subscribers.Completion<Failure>){
                if !downstreamCompleted {
                    downstream.receive(completion: completion)
                    downstreamCompleted = true
                    innerCancellable = nil
                }
            }

        }
    }
}