apple / swift-async-algorithms

Async Algorithms for Swift
Apache License 2.0
2.86k stars 139 forks source link

Subject equivalent? #176

Open ursusursus opened 2 years ago

ursusursus commented 2 years ago

We need a way to imperatively pipe events into a AsyncSequence same asi Subjects from Combine did, or Kotlin's MutableState/SharedFlow

AsynchChannel feels like a low level primitive to be used for state tracking the way CurrentValueSubject was

bioche commented 2 years ago

This would be a nice addition as it's a tool vastly used in Rx as well (BehaviorSubjects, PublishSubjects, BehaviorRelays, PublishRelays etc...) I guess for the time being we could build one using continuations with something in this flavour.

class BehaviorRelay<V: Sendable> {

    init(_ initialValue: V) {
        self.currentValue = initialValue
        self.continuations = []
    }

    var currentValue: V
    var continuations: [AsyncStream<V>.Continuation]

    let lock = NSLock()

    func accept(newValue: V) {
        lock.lock()
        defer { lock.unlock() }
        currentValue = newValue
        continuations.forEach { continuation in
            continuation.yield(newValue)
        }
    }

    func makeStream() -> AsyncStream<V> {
        AsyncStream { continuation in
            lock.lock()
            defer { lock.unlock() }
            continuation.yield(currentValue)
            continuations.append(continuation)
        }
    }
}
ursusursus commented 2 years ago

If the Relay itself would be a asyncsequence, would be great

bioche commented 1 year ago

Any news on integrating a subject equivalent in the lib at this time ? @phausler maybe ^^

phausler commented 1 year ago

AsyncChannel serves this purpose to some extent (it is not a current value but a pass through subject style behavior with back pressure)

FranzBusch commented 1 year ago

The thing we are still missing is a multicast or share algorithm. Right now the focus, as @phausler pointed out in some other issues, is to the get the set of algos that are in here in a stable state and through evolution.

rustle commented 1 year ago

I put up a draft PR https://github.com/apple/swift-async-algorithms/pull/208 and am hoping to fill out all the placeholders with a more polished version of the approach I'm using here where I am wrapping an AsyncStream and hosting it in a Task to ensure sequencing (relative to caller) https://github.com/rustle/TaskHostedAsyncSequence/tree/main/Sources/TaskHostedAsyncSequence. Would love to collaborate with any of y'all on this.

rustle commented 1 year ago

Pushed up a working (so far) AsyncSubject. I'll work on AsyncThrowingSubject next. Then I'll flesh out tests for both.

twittemb commented 1 year ago

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

rustle commented 1 year ago

Hi @rustle

I allow my self to post a comment here since I’ve made the same kind of implementation in a pitch on the forum -> https://forums.swift.org/t/pitch-async-buffered-channel/59854

The outcome seemed to be that it was too close to AsyncStream and that having a factory function like https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Creators/AsyncStream%2BPipe.swift should be enough (and could be pitched to the standard library).

Cool. I've seen a few versions floating around and will definitely check out yours.

DagAgren commented 1 year ago

I would warmly recommend considering adopting the semantics of ReactiveSwift's Property/MutableProperty here, which I found vastly superior to anything I've seen in Rx or Combine.

Basically, a Property has a value, which in the case of MutableProperty can also be explicitly set. However, Property itself offers all the stream operators like map and combineLatest and so on, and those return a new Property.

This means that you can do things like:

let a = MutableProperty(0)
let b = MutableProperty(0)
let c: Property<Int> = Property.combineLatest(a, b).map { $0 + $1 }

print("\(c.value)") // Prints 0.

a.value = 1
b.value = 2

print("\(c.value)") // Prints 3.

While also allowing you to stream the values of all of a, b and c as they update. This allows you to mix and match reactive and imperative programming as you prefer, which is massively convenient in lots of situations.

malhal commented 1 year ago

I've been experimenting with this AsyncSubject from ConcurrencyPlus, which essentially is a convenience wrapper for this annoyance:

var continuation: AsyncStream<Int>.Continuation!
let stream = AsyncStream<Int> {
    continuation = $0
}

When using it I've noticed a difference in behaviour in combineLatest compared to Combine's and currently debugging where the problem lies. The problem is if combining 2 streams and multiple values are sent to the first stream, when the second stream receives its first value the for await loops for all the previous first stream values instead of just once for the latest pair.

Edit: its because the linked AsyncSubject uses AsyncStream that buffers, even setting .bufferingNewest(0) or .bufferingOldest(0) didn't fix it. AsyncChannel also buffers. I think I need something equivalent to Combine's PassthroughSubject. Edit: I was wrong, I don't, using AsyncStream requires designing pipelines backwards compared to Combine.

justin-foreflight commented 1 year ago

Here is my attempt at an AsyncCurrentValueSubject loosely based on code found in this repository:

public final class AsyncCurrentValueSubject<Element: Sendable>: AsyncSequence, Sendable {
    // MARK: - AsyncSequence

    public struct Iterator: AsyncIteratorProtocol, @unchecked Sendable {
        private let id: UInt64
        private let subject: AsyncCurrentValueSubject<Element>
        private var finished: Bool = false

        fileprivate init(id: UInt64, subject: AsyncCurrentValueSubject<Element>) {
            self.id = id
            self.subject = subject
        }

        public mutating func next() async -> Element? {
            if finished {
                return nil
            }

            guard let element = await subject.next(id: id) else {
                finished = true
                return nil
            }

            return element
        }
    }

    public func makeAsyncIterator() -> Iterator {
        return Iterator(id: generateId(), subject: self)
    }

    // MARK: - Public interface

    public init(_ element: Element) {
        self.state = .init(.iterating(element: element, updated: [], suspended: [:], cancelled: []))
    }

    public func send(_ element: Element) {
        for continuation in stateMachineSend(element: element) {
            continuation.resume(returning: element)
        }
    }

    public func finish() {
        for continuation in stateMachineFinish() {
            continuation.resume(returning: nil)
        }
    }

    // MARK: - Implementation details

    private let ids = ManagedCriticalState<UInt64>(0)

    private func generateId() -> UInt64 {
        return ids.withCriticalRegion { nextId in
            defer { nextId &+= 1 }
            return nextId
        }
    }

    fileprivate enum State {
        case finished
        case iterating(element: Element, updated: Set<UInt64>, suspended: [UInt64 : UnsafeContinuation<Element?, Never>], cancelled: Set<UInt64>)
    }

    private let state: ManagedCriticalState<State>

    private func next(id: UInt64) async -> Element? {
        let (shouldReturn, element) = stateMachineNextImmediate(id: id)

        if shouldReturn {
            return element
        }

        return await withTaskCancellationHandler {
            await withUnsafeContinuation { continuation in
                let (continuation, element) = stateMachineNextSuspended(id: id, continuation: continuation)
                continuation?.resume(returning: element)
            }
        } onCancel: {
            cancel(id: id)
        }
    }

    private func cancel(id: UInt64) {
        let continuation = stateMachineCancel(id: id)
        continuation?.resume(returning: nil)
    }

    private func stateMachineSend(element: Element) -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []

            case .iterating(_, _, var suspended, let cancelled):
                let suspendedIds = Set(suspended.keys)
                let suspendedContinuations = Array(suspended.values)
                suspended.removeAll()

                state = .iterating(element: element, updated: suspendedIds, suspended: suspended, cancelled: cancelled)

                return suspendedContinuations
            }
        }
    }

    private func stateMachineFinish() -> [UnsafeContinuation<Element?, Never>] {
        return state.withCriticalRegion { state -> [UnsafeContinuation<Element?, Never>] in
            switch state {
            case .finished:
                return []

            case .iterating(_, _, let suspended, _):
                let suspendedContinuations = Array(suspended.values)

                state = .finished

                return suspendedContinuations
            }
        }
    }

    private func stateMachineNextImmediate(id: UInt64) -> (shouldReturn: Bool, element: Element?) {
        return state.withCriticalRegion { state -> (Bool, Element?) in
            switch state {
            case .finished:
                return (true, nil)

            case .iterating(let element, var updated, let suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)

                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return (true, nil)
                }
                else if updated.contains(id) {
                    return (false, nil)
                }
                else {
                    updated.insert(id)

                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return (true, element)
                }
            }
        }
    }

    private func stateMachineNextSuspended(id: UInt64, continuation: UnsafeContinuation<Element?, Never>) -> (UnsafeContinuation<Element?, Never>?, Element?) {
        return state.withCriticalRegion { state -> (UnsafeContinuation<Element?, Never>?, Element?) in
            switch state {
            case .finished:
                return (continuation, nil)

            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(suspended[id] == nil)
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)

                if let _ = cancelled.remove(id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return (continuation, nil)
                }
                else if let _ = updated.remove(id) {
                    suspended[id] = continuation

                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return (nil, nil)
                }
                else {
                    updated.insert(id)

                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return (continuation, element)
                }
            }
        }
    }

    private func stateMachineCancel(id: UInt64) -> UnsafeContinuation<Element?, Never>? {
        return state.withCriticalRegion { state -> UnsafeContinuation<Element?, Never>? in
            switch state {
            case .finished:
                // finished before cancelled
                return nil

            case .iterating(let element, var updated, var suspended, var cancelled):
                precondition(!cancelled.contains(id))
                let suspendedIds = Set(suspended.keys)
                precondition(updated.intersection(suspendedIds).isEmpty)
                precondition(updated.intersection(cancelled).isEmpty)
                precondition(suspendedIds.intersection(cancelled).isEmpty)

                if let _ = updated.remove(id) {
                    cancelled.insert(id)

                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return nil
                }
                else if let continuation = suspended.removeValue(forKey: id) {
                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return continuation
                }
                else {
                    cancelled.insert(id)

                    state = .iterating(element: element, updated: updated, suspended: suspended, cancelled: cancelled)

                    return nil
                }
            }
        }
    }
}
ingun37 commented 7 months ago

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

malhal commented 7 months ago

After using it for a while I've learned it's more like the opposite of Combine. Rather than receive publishers from different places and combine them, instead you just write the code procedurally like normal.

FranzBusch commented 7 months ago

AsyncSequence the primary building block of this package is using a pull based eventing approach whereas Combine uses a push based model. The other difference here is that AsyncSequence are a language level feature and have first class support via for await in. On the larger topic of Subject like types, there is definitely room for more algorithms in this library to make it possible to model multicast AsyncSequence. In fact, there have been a few different test in various PRs already though the pull based model makes this slightly more interesting to implement and there are a few open implementation questions.

phausler commented 7 months ago

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

FranzBusch commented 7 months ago

Combine uses a push based model.

No; this is not true - Combine uses a demand based model; everything is pull based.

I was talking about the mechanics how elements are delivered. You are right that demand is pull based in the end.

realityworks commented 2 weeks ago

I'm curious. Is async-algorithms is a replacement of Combine as reactive programming? Is that why Subject equivalent is needed while Combine already provides them? Or is it unnecessary as concurrency programming library because Subject is concept of reactive programming?

Apple decided to break all of the nice combine features with their @Observable macro.

So now if you want to design a viewModel and pipe values, you need to use the old process of inheriting classes with @ObservableObject.

The problem is that, using the @Observable macro, you cannot do data bindings. Which makes life a complex hell when working with state to view bindings from external sources.

WORKS

class AClass: ObservableObject {
...
@Published var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

DOES NOT WORK

@Observable class AClass {
...
var aVariable: Bool
...
aPublisher
  .assign(to: &$aVariable)
}

Are there plans to get this Subject equivalent to work?