Open paulofaria opened 2 years ago
I'm not sure if it's worth creating a wrapper type for ReceivingChannel
, but one advantage would be symmetry.
Once the language can support some AsyncSequence<T>
then the ReceivingChannel
is just the opaque AsyncChannel
.
@paulofaria @phausler Would this be a reasonable implementation of ReceivingChannel
and ReceivingThrowingChannel
respectively do you think?
If so I could create a PR.
ReceivingChannel
//
// ReceivingChannel.swift
//
//
// Created by Alexander Cyon on 2022-09-12.
//
import Foundation
import AsyncAlgorithms
public final class ReceivingChannel<Element: Sendable>: Sendable, AsyncSequence {
public typealias Base = AsyncChannel<Element>
@frozen
public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
var iterator: Base.Iterator?
@usableFromInline
init(_ iterator: Base.Iterator) {
self.iterator = iterator
}
@inlinable
public mutating func next() async -> Base.Element? {
if !Task.isCancelled, let value = await iterator?.next() {
return value
} else {
iterator = nil
return nil
}
}
}
@usableFromInline
let base: Base
@usableFromInline
init(_ base: Base) {
self.base = base
}
@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(base.makeAsyncIterator())
}
/// Send a finish to all awaiting iterations.
/// All subsequent calls to `next(_:)` will resume immediately.
public func finish() {
self.base.finish()
}
}
public extension AsyncChannel {
func receivingOnly() -> ReceivingChannel<Element> {
ReceivingChannel(self)
}
}
ReceivingThrowingChannel
//
// ReceivingThrowingChannel.swift
//
//
// Created by Alexander Cyon on 2022-09-12.
//
import Foundation
import AsyncAlgorithms
public final class ReceivingThrowingChannel<Element: Sendable, Failure: Error>: Sendable, AsyncSequence {
public typealias Base = AsyncThrowingChannel<Element, Failure>
@frozen
public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
var iterator: Base.Iterator?
@usableFromInline
init(_ iterator: Base.Iterator) {
self.iterator = iterator
}
@inlinable
public mutating func next() async throws -> Base.Element? {
if !Task.isCancelled, let value = try await iterator?.next() {
return value
} else {
iterator = nil
return nil
}
}
}
@usableFromInline
let base: Base
@usableFromInline
init(_ base: Base) {
self.base = base
}
@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(base.makeAsyncIterator())
}
/// Send a finish to all awaiting iterations.
/// All subsequent calls to `next(_:)` will resume immediately.
public func finish() {
self.base.finish()
}
}
public extension AsyncThrowingChannel {
func receivingThrowingOnly() -> ReceivingThrowingChannel<Element, Failure> {
ReceivingThrowingChannel(self)
}
}
Tests which are copy-paste of TestChannel
but without tests requiring non public ManagedCriticalState
- modified to send
values on AsyncChannel
and AsyncThroingChannel
but which use makeAsyncIterator
on let receivingChannel = asyncChannel.receivingOnly()
.
//
// ReceivingChannelTests.swift
//
//
// Created by Alexander Cyon on 2022-09-12.
//
import Foundation
import XCTest
import AsyncAlgorithmExtensions
import AsyncAlgorithms
struct Failure: Error, Equatable { }
final class ReceivingChannelTests: XCTestCase {
func test_asyncChannel_delivers_values_when_two_producers_and_two_consumers() async {
let (sentFromProducer1, sentFromProducer2) = ("test1", "test2")
let expected = Set([sentFromProducer1, sentFromProducer2])
let asyncChannel = AsyncChannel<String>()
Task {
await asyncChannel.send(sentFromProducer1)
}
Task {
await asyncChannel.send(sentFromProducer2)
}
let receivingChannel = asyncChannel.receivingOnly()
let t: Task<String?, Never> = Task {
var iterator = receivingChannel.makeAsyncIterator()
let value = await iterator.next()
return value
}
var iterator = receivingChannel.makeAsyncIterator()
let (collectedFromConsumer1, collectedFromConsumer2) = (await t.value, await iterator.next())
let collected = Set([collectedFromConsumer1, collectedFromConsumer2])
XCTAssertEqual(collected, expected)
}
func test_asyncThrowingChannel_delivers_values_when_two_producers_and_two_consumers() async throws {
let (sentFromProducer1, sentFromProducer2) = ("test1", "test2")
let expected = Set([sentFromProducer1, sentFromProducer2])
let asyncChannel = AsyncThrowingChannel<String, Error>()
Task {
await asyncChannel.send("test1")
}
Task {
await asyncChannel.send("test2")
}
let receivingChannel = asyncChannel.receivingThrowingOnly()
let t: Task<String?, Error> = Task {
var iterator = receivingChannel.makeAsyncIterator()
let value = try await iterator.next()
return value
}
var iterator = receivingChannel.makeAsyncIterator()
let (collectedFromConsumer1, collectedFromConsumer2) = (try await t.value, try await iterator.next())
let collected = Set([collectedFromConsumer1, collectedFromConsumer2])
XCTAssertEqual(collected, expected)
}
func test_asyncThrowingChannel_throws_and_discards_additional_sent_values_when_fail_is_called() async {
let sendImmediatelyResumes = expectation(description: "Send immediately resumes after fail")
let asyncChannel = AsyncThrowingChannel<String, Error>()
asyncChannel.fail(Failure())
let receivingChannel = asyncChannel.receivingThrowingOnly()
var iterator = receivingChannel.makeAsyncIterator()
do {
let _ = try await iterator.next()
XCTFail("The AsyncThrowingChannel should have thrown")
} catch {
XCTAssertEqual(error as? Failure, Failure())
}
do {
let pastFailure = try await iterator.next()
XCTAssertNil(pastFailure)
} catch {
XCTFail("The AsyncThrowingChannel should not fail when failure has already been fired")
}
await asyncChannel.send("send")
sendImmediatelyResumes.fulfill()
wait(for: [sendImmediatelyResumes], timeout: 1.0)
}
func test_asyncChannel_ends_iterator_when_task_is_cancelled() async {
let asyncChannel = AsyncChannel<String>()
let receivingChannel = asyncChannel.receivingOnly()
let ready = expectation(description: "ready")
let task: Task<String?, Never> = Task {
var iterator = receivingChannel.makeAsyncIterator()
ready.fulfill()
return await iterator.next()
}
wait(for: [ready], timeout: 1.0)
task.cancel()
let value = await task.value
XCTAssertNil(value)
}
func test_asyncThrowingChannel_ends_iterator_when_task_is_cancelled() async throws {
let asyncChannel = AsyncThrowingChannel<String, Error>()
let receivingChannel = asyncChannel.receivingThrowingOnly()
let ready = expectation(description: "ready")
let task: Task<String?, Error> = Task {
var iterator = receivingChannel.makeAsyncIterator()
ready.fulfill()
return try await iterator.next()
}
wait(for: [ready], timeout: 1.0)
task.cancel()
let value = try await task.value
XCTAssertNil(value)
}
}
100% agree with the previous comment from @phausler. We could perhaps have a SendingChannel
protocol that AsyncChannel conforms to.
protocol SendingChannel<Element> {
associatedtype Element: Sendable
func send(_:) async
func finish()
}
It could make sense if there are several types of Channels in the future.
Yes, but if we do not want to wait for Async types to get Primary Associated Type (PRAT) support, might be nice to get something in the mean time?
It sound as if AsyncSequences getting PRAT support might take some time since it is effectively blocked by Precise Error Typing (just asked for status update) - from Swift Forums thread:
(1) AsyncSequence and AsyncIteratorProtocol logically ought to have Element as their primary associated type. However, we have ongoing evolution discussions 14 about adding a precise error type to these. If those discussions bear fruit, then the new Error associated type would need to also be marked primary. To prevent source compatibility complications, adding primary associated types to these two protocols is deferred to a future proposal.
I think what AsyncChannel
could benefit from is having a nested type called Source
which is the type you can write to. We have been doing this in our swift-nio async sequence as well.
The benefit of doing this is that it not only allows you to vend a specific interface to only the write side of things, it also allows you to implement logic inside AsyncChannel
to finish the sequence if nobody is holding onto the Source
anymore.
For the places where you want to both write and consume the channel you can just pass both types. This way you also don't need a Receiving/
or SendingChannel
but just an AsyncChannel
and an AsyncChannel.Source
.
An important ability when passing around channels is to limit sending/receiving by passing a send-only or a receive-only channel. Right now, it is possible to erase receiving, since
AsyncChannel
is just anAsyncSequence
. However, there's currently no mechanism to create a send-only version of a channel. The simplest solution I think is to create a wrapper type calledSendingChannel
or something. I think this is a good reason to renameAsyncChannel
to justChannel
(#47). Otherwise we would haveSendingAsyncChannel
, which is not too bad, but I really think theAsync
part doesn't help us much.