Closed b-strauss closed 8 years ago
Streams are too heavy for implementing async iterators because of all the other baggage that they have. However it would make sense for streams to implement the async iteration protocol so they could be consumed in for-of loops.
Observables seem more likely to be useful for implementing async iterators, but they're a bit different in that you subscribe to the source and have data pushed to you, whereas iterators are about pulling the data.
Also, extending the existing concepts for regular iterators to be async seems to make sense.
Also, the main difference between Streams and Observables, by the looks of it, is that streams are more focused on binary data processing and have back pressure, whereas Observables are more for reacting to sequences of events over time.
Great question, thanks!
So the first thing to consider is that async iterators aren't simply push-based. The result of each iteration is delivered asynchronously through a promise, but each iteration is requested by the consumer calling next
with (maybe) a value. This setup has some nice properties:
Also, async iterators can easily be adapted to observables:
function toObservable(asyncIterable) {
return new Observable(observer => {
let stop = false;
async function drain() {
for await (let x of asyncIterable) {
if (stop) break;
observer.next(x);
}
}
drain().then(x => observer.complete(x), err => observer.error(err));
return _=> { stop = true; }
});
}
The big picture is that support for async iterators is built into the language syntax via async generator functions and for-await
. Observables and Streams are library abstractions which sit on top of the language and can expose async iterators for consumption by for-await
, similar to how collection APIs expose iterators for consumption by for-of
.
Hope that helps!
This all sounds logical to me and I think I understand the differences of these APIs. What I have a problem to understand, and that might be a bit naive from a simple API consumer point of view, is that there is already a system that I have used that does all these things (events and I/O) with a unified interface.
Dart: Asynchronous Programming: Streams Dart's Stream class Single-Subscription vs. Broadcast Streams
And there seems to be at least some kind of recognition about this, because the same question, whether there is some kind of unifiable pattern, was raised 2014 on JSConf.
As I said my view might be a bit naive, without knowing the specific implementation details, but for someone with a Dart background it just seems a bit odd to me that I now have to use 3 seperate APIs to solve problems that are solvable with a single interface in Dart.
I think it's due to people having different focuses. Streams in JS are more focused on processing data, like binary data. Observables aim to be as purely functional and low level as possible, which isn't as useful for certain applications. Async iterators seem to be just taking the existing concepts of generators and mixing them with async functions as a logical next step.
Personally, I'm very excited about async iterators out of all these new specs the most.
I'm also looking forward to all of these, it just seems a bit like a lost chance to me to not model these around some unified interface. It feels a bit like Array and NodeList. :/
I'm also looking forward to all of these, it just seems a bit like a lost chance to me to not model these around some unified interface. It feels a bit like Array and NodeList.
Feel free to consider async iterators as the unified protocol for async sequences : )
Yeah, maybe someone will write some kind of abstractions around all of these. :)
Anyway i will close this issue, thanks for the explanations.
but each iteration is requested by the consumer calling
next
with (maybe) a value.
That's right. But, why? Is it useful? You can use a parameter with another iterator instead of returning values through next. In fact, the parameter of next is counterintuitive:
function *c(a) {
a = yield a + 10;
a = yield a + 20;
a = yield a + 30;
a = yield a + 40;
}
it = c(1);
console.log(it.next(2).value); // Result: 11
console.log(it.next(3).value); // Result: 23 (!?)
console.log(it.next(4).value); // Result: 34
I am a kind of noob with Javascript generators, so any hint about it would be appreciated (although I have been programming Javascript for almost 20 years, and I have even written parallel C++ compilers). In fact, except to emulate async function
I never have seen any other use.
Async functions are great, its adoption is very fast, and people really prefer to use them to other solutions. They relay in Promises, which are really well known and have been proved very useful, and they map great against Promises.
Observables are also great. Although they have a huge cognitive load (so many functions, considerations, ...) they have been proved to be very useful.
So because all of this, may be it should be considered the fact to map async iterators into Observables. It is very likely (I should write some example) that cognitive load of Observables will be reduced drastically.
@drpicox Observables aren't directly comparable to async generators, async generators do nothing unless .next
is called (or unless they impose extra work upon themselves) e.g.:
async function* animationFrames() {
while (true) {
yield new Promise(requestAnimationFrame)
}
}
// vs
function animationFrames() {
return new Observable(observer => {
let frame
function step() {
frame = requestAnimationFrame(time => {
observer.next(time)
step()
})
}
step()
return _ => clearAnimationFrame(frame)
}
If you use observable then every single animation frame will cause the callback to invoked which may cause jank or backpressure if the consumer of the frames can't keep up e.g.:
async function main() {
for await (const time of animationFrames()) {
// At worst the user sees a few frames skipped sometimes but
// this will always be roughly in sync with the frames
longOperationSometimesSpanningMultipleFrames(time)
}
}
function main() {
animationFrames().subscribe({
// If longOperations cause a slowdown of frame rate then
// it'll appear like are happening slower, e.g. if the operation
// takes 5 animation frames sometimes then those 5 frames
// will still definitely happen just late
next: longOperationSometimesSpanningMultipleFrames
})
}
This happens because async iterators are lazy by value but observables are lazy by subscription, it's not to say async generators are better, it's harder to represent events where every event must be responded to as async generators but not impossible.
By all means use what works for your project, Observables are probably going to be async iterable (and if not you can always create operators to do it) in some way and Observable.from
will almost certainly convert async iterables so they should be fairly interoperable.
The value being sent to the async iterable is so that it's the same as generators in this regard, it can be used to implement coroutines that can also await.
Basically coroutines allow two way communication, e.g. you yield values out and you can send responses back in, they don't really have the same use cases as iterators even if they have the same syntax.
When sending a value to .next
you're just replacing the location of yield
with the value you send back in:
function* myGen() {
const val = yield 1
const val2 = yield 2 + val
}
const gen = myGen()
// There's no point that's stopped in the generator for the value 9999
const request1 = gen.next(9999).value
// But after calling gen.next(9999) the generator is stopped at
// the first yield so it can get a response back for that yield
const request2 = gen.next(request1 + 17).value
console.log(request2) // 20 which is 18 (the value sent to the first yield)
// plus 2
Coroutines can be useful if you want things to be decoupled from how they're interpreted e.g.:
function* storeInDatabase(data) {
// The executor need not return any particular implementation
// of connection
try {
// Suppose the database did not exist the executor can even use
// .throw to cause an error at this point
const connection = yield ['open', 'the-database-uri.tld/database']
} catch (err) {
return false
}
// The database might not even need to do transactions if it's synchronous
// So an executor could choose to ignore this
yield ['beginTransaction', connection]
for (const { user, value } of data) {
// The coroutine engine can aggregate these
// or do any optimizations it might want to
yield ['setData', user, value]
}
yield ['endTransaction', connection]
// An executor could even choose to do things like revoking the
// database connection here once the function has finished executing
return true
}
Async generators do nothing more to extend this than to allow awaiting promises within the generator which is probably useful if you need to await for things that are the same across platforms.
Uhmmmm, you made your point with the backpressure.
But the example that you have shown of animationFrames may be wrong because the first frame is generated before anyone request it, so, probably the first call will get an obsolete result.
And I do not like the example storeInDatabase, you can achieve the same with an async function and using the database as parameter:
async function storeInDatabase(database, data) {
// The db need be any particular implementation
// of connection
try {
// Suppose the database did not exist the db can even use
// throw to cause an error at this point
const connection = await db.open('the-database-uri.tld/database');
} catch (err) {
return false
}
// The database might not even need to do transactions if it's synchronous
// So db could choose to ignore this
await db.beginTransaction(connection);
for (const { user, value } of data) {
// The db engine can aggregate these
// or do any optimizations it might want to
await db.setData(user, value)
}
await db.endTransaction(connection);
// A db could even choose to do things like revoking the
// database connection here once the function has finished executing
return true
}
I will investigate a little bit more about Observables and backpresure (there is some existing work: https://github.com/ReactiveX/RxJava/wiki/Backpressure). Backpressure exists when Observables are synchronous, may be now is the good timing to fix https://github.com/tc39/proposal-observable .
Actually the first animation frame won't be obselete, generators don't execute their first section until the first .next
call is done e.g.:
function* gen(x) {
console.log("Hello!")
yield x
console.log(x)
}
const g = gen(10) // Nothing logged yet
g.next() // logs "Hello!"
g.next() // logs 10
// Same thing with async generators, the first part of the generator
// code is not executed until the first call to next
async function* animationFrames() {
for (let i = 0 ; ; i++) {
console.log(i)
yield new Promise(requestAnimationFrame)
}
}
async function main() {
const frames = animationFrames()
// Nothing is printed yet
await frames.next() // Only here is 0 printed
await frames.next() // 1
}
main()
I'm not really sure of any great examples of coroutines as I haven't used them very much. Perhaps someone who knows more about coroutines would be able to offer better explanations. This blog might offer some insights into the design of generators having two way communication though.
I did not know it. Thanks!
Btw: I love coroutines and I have used a lot of them in developments in C and C++. The typical case of usage is a lexer/parser coroutines (flex/bison), but the best application that I found was to emulate system calls in a multicore processor simulator that I built.
You can learn more here about C & C++ corountines:
#define crBegin static int state=0; switch(state) { case 0:
#define crReturn(i,x) do { state=i; return x; case i:; } while (0)
#define crFinish }
int function(void) {
static int i;
crBegin;
for (i = 0; i < 10; i++)
crReturn(1, i);
crFinish;
}
May be my missing concepts come from how this implementation worked and I have seen no usage of function* except to emulate async/await.
May be function* would be useful to compute lists like head:[...tail] like functional like languages does, that they only compute things as soon as they are requested so they can deal with infinite lists.
Is there any reason not to use Observables or Streams for this? As far as I understand these are push based APIs. Why do we need 2, in the future potentially 3 with Streams, APIs for a single type of effect?
For all other type of effects I know exactly what to use (
T, Promise<T>, Iterator<T>
). What's the fourth one?Observable<T>
?Stream<T>
?AsyncIterator<T>
?Maybe I don't see the big picture, but I don't understand why we can't have a unified interface for this.