CombineCommunity / CombineExt

CombineExt provides a collection of operators, publishers and utilities for Combine, that are not provided by Apple themselves, but are common in other Reactive Frameworks and standards.
https://combine.community
MIT License
1.72k stars 151 forks source link

withLatestFrom doesn't seem to be thread safe #163

Open fbarbat opened 10 months ago

fbarbat commented 10 months ago

withLatestFrom doesn't seem to be thread safe when subscribing to two different publishers that emit elements from different threads. This is part of the current implementation of withLatestFrom. Check the comments in uppercase.

private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
      var gotInitialValue = false

      let subscriber = AnySubscriber<Other.Output, Other.Failure>(
        receiveSubscription: { [weak self] subscription in
            self?.otherSubscription = subscription
            subscription.request(.unlimited)
        },
        receiveValue: { [weak self] value in
            guard let self = self else { return .none }

            // THIS IS CALLED FROM THREAD B
            self.latestValue = value

            if !gotInitialValue {
                // When getting initial value, start pulling values
                // from upstream in the main sink
                self.sink = Sink(upstream: self.upstream,
                                 downstream: self.downstream,
                                 transformOutput: { [weak self] value in
                                    guard let self = self,
                                          // THIS IS CALLED FROM THREAD A, ACCESSING THE SAME VAR BUT IT IS NOT SYNCHRONIZED
                                          let other = self.latestValue else { return nil }

                                    return self.resultSelector(value, other)
                                 },
                                 transformFailure: { $0 })

                // Signal initial value to start fulfilling downstream demand
                gotInitialValue = true
                onInitialValue()
            }

            return .unlimited
        },
        receiveCompletion: nil)

      self.second.subscribe(subscriber)
    }

Should a lock over latestValue be added to ensure thread safety? Are there reasons not to do it (for example, performance)? Should CombineExt at least update the Swift docs saying it is not supposed to be used from multiple threads?

Thank you.