Closed kimfucious closed 3 years ago
I'm not going to migrate this library to async/await due to compatibility concerns. I want to support Swift 5.3+. If you want to use async/await, create a Publisher
extension that awaits output from the publisher.
In order to await multiple values, you'll probably want to create a method that returns an AsyncThrowingStream
type.
chaining sinks
I don't know what this means. Please provide an example.
Hi @Peter-Schorn thanks for the quick response.
I understand your not wanting to go the async/await route at this time.
I'm most likely mangling terms when saying, "chaining sinks", but I meant, simply performing several function calls that use .sink one after another each dependent on the result of the former.
Regardless, your code example, using withThrowingContinuation
, is very similar to other functions that I'm currently using in my app.
I've worked up the below, which is a function that peforms a search by ISRC for a Spotify track, using your lovely library. It's part of a larger process, invovling other async calls (using WithThrowingTaskGroup), but should serve to demonstrate my attempt to use SpotifyWebAPI to fetch a value and return it asynchronously.
Frankly, I'm not even sure, if I need sink or if I'm using it right here, as I'm a bit of a dummy with the Combine framework.
All I really need, I think is to return the value, after which I store elsewhere in an observable object with other @Published variables used by the app.
func getSpotifyTrackByIsrc(appleMusicTrack: MusicKit.Track, token: SpotifyClientAccessToken) async throws -> MatchedSpotifyTrack? {
var cancellables: Set<AnyCancellable> = []
return try await withCheckedThrowingContinuation {continuation in
let isrc: String = appleMusicTrack.isrc ?? ""
spotify.api.search(query: "isrc:\(isrc)", categories: [IDCategory.track], market: "from_token", limit: 1)
.receive(on: RunLoop.main)
.sink(
receiveCompletion: { result in
switch result {
case.finished:
print("finished")
case.failure(let error):
print("error", error)
continuation.resume(throwing: error)
}
},
receiveValue: { searchResults in
let trackItems = searchResults.tracks?.items
if let track = trackItems?.first {
let matchedTrack = MatchedSpotifyTrack(
track: track,
// other props here (ignore for now)
)
continuation.resume(returning: matchedTrack)
} else {
continuation.resume(returning: nil)
}
}
)
.store(in: &cancellables)
}
}
For the record, the above does work, I just don't know if there's a better way to implement it or not.
but I meant, simply performing several function calls that use .sink one after another each dependent on the result of the former.
If you want to chain together publishers so that the next publisher relies on the output from the previous publisher, then you should use the flatMap
operator. You should never make a call to another publisher in the sink
method. If you are, then you're using the Combine framework incorrectly.
Here are two methods that provide an async/await interface to Combine publishers:
extension Publisher {
/**
Awaits a single value from this publisher.
If the publisher finishes with an error, then this error will be thrown
from this method. If the publisher finishes normally without producing any
elements, then returns `nil`.
*/
func awaitSingleValue() async throws -> Output? {
return try await withCheckedThrowingContinuation { continuation in
var didSendValue = false
var cancellable: AnyCancellable? = nil
cancellable = self
.sink(
receiveCompletion: { completion in
if didSendValue {
return
}
switch completion {
case .finished:
continuation.resume(returning: nil)
case .failure(let error):
continuation.resume(throwing: error)
}
},
receiveValue: { value in
// prevent any more values from being received
cancellable?.cancel()
if !didSendValue {
didSendValue = true
continuation.resume(returning: value)
}
}
)
}
}
/**
Returns an AsyncThrowingStream for asynchronously iterating through
the values produced by this publisher.
*/
func awaitValues() -> AsyncThrowingStream<Output, Error> {
return AsyncThrowingStream { continuation in
let cancellable = self
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
},
receiveValue: { value in
continuation.yield(value)
}
)
continuation.onTermination = { @Sendable termination in
cancellable.cancel()
}
}
}
}
For example:
let searchResults = try await spotify.api.search(
query: "isrc:\(isrc)",
categories: [.track],
market: "from_token",
limit: 1
)
.awaitSingleValue()
let trackItems = searchResults?.tracks?.items
Thanks for the awesome response, @Peter-Schorn.
You are absolutely right that I don't know what I'm doing with the Combine framework, so I appreciate your patience and valuable feedback.
I shall try to implement something based on the above, and report back with my findings.
@Peter-Schorn,
I just wanted to let you and others who may come across this know that the extension you've detailed above works most excellently!
Thanks for taking the time to provide this valuable feedback.
I appreciate all the work that's been put into this library.
I wonder if you've considered migrating this to use any of the newer async/await Swift features that will be released Q4 2021?
The reason I ask is that chaining sinks is rather tedious in comparison to async/await.
I would be great to be able to: