tc39 / proposal-async-iteration

Asynchronous iteration for JavaScript
https://tc39.github.io/proposal-async-iteration/
MIT License
859 stars 44 forks source link

No means for secure stopping async generators #126

Open awto opened 6 years ago

awto commented 6 years ago

Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.

For example, I use it for UI and convert DOM events into an async iterable with a function subject (I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional method send to pass the event into the stream.

And there is a function combine simply merging all async iterables in arguments into a single one.

async function* combine(...args) {
  const threads = args.map(i => i[Symbol.asyncIterator]())
  const sparks = new Set(threads.map(i => ({thread:i,step:i.next()})))
  try {
    while(sparks.size) {
      const v = await Promise.race([...sparks]
                                   .map(i => i.step.then(({done,value}) => ({done,value,spark:i}))))
      sparks.delete(v.spark)
      if (!v.done) {
        sparks.add({...v.spark,step:v.spark.thread.next()})
        yield v.value
      }
    }
  } finally {
    await Promise.all([...threads].map((i) => i.return()))
  }
}

And now I iterate two such sources combined and exit the loop on some condition.

const subj1 = subject()
const subj2 = subject()

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,subj2)) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}
//.....
subj1.send(1)
subj2.send(2)

Everything is pretty fine, I added console.log in exit function in subject all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or just yield*)

async function* copy(input) {
  for await(const i of input)
    yield i
}

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,copy(subj2))) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}

Now test never exits (never calls next then), it waits in await in the combine's finally block. The reason is pretty simple and obvious, the copy generator is blocked on next and generators use the same queue for return.

I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:

writer.next(bufferA);
writer.next(bufferB);
await writer.return(); // Only wait for the file to close

This is maybe better to be done this way:

await Promise.all([writer.next(bufferA), writer.next(bufferB)]);
await writer.return(); // Only wait for the file to close

And how would we convert async generator into observable? For example, the solution from #20 won't work.

function toObservable(asyncIterable) {
    return new Observable(observer => {
        let stop = false;
        async function drain() {
            for await (let x of asyncIterable) {
                if (stop) break;
                observer.next(x);
            }
        }
        drain().then(x => observer.complete(x), err => observer.error(err));
        return _=> { stop = true; }
    });
}

The cleanup function won't stop the source generator (even if we add return call for object returned by drain) if there are no more values sent by original iterable, it is forever locked in next.

Current status

There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.

Workarounds

Using Transducers (lifted generators)

Transducers are functions taking a stream as input and producing another stream. Generators can read its input passed as an argument and it is there we can send some specific stop signal if needed.

See more detailed description here:

There is fork function which merges streams and stops well, but it merges lifted streams - transducers

Transpiler

@effetful/es transpiler adds lightweight cancelation support to async function.

Resulting Promises of async functions transpiled by this are amended with M.cancelSymbol which is a function which tries to cancel execution of this function. It executes finally blocks, and propagates cancelation to current await expression. It doesn't do anything more (e.g. no propagation to children).

benjamingr commented 6 years ago

I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?

benjamingr commented 6 years ago

Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.

That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?

awto commented 6 years ago

@benjamingr

I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?

regardless it is combineLatest or some other combination of more than one source streams, sooner or later we'll need some of it, yes, I will need to pull from all of them if the caller is pulled

That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?

Not eagerly, when first next called it awaits on Promise.race of all of the sources next. After the race is resolved its value is the result of the caller's next. The other promises (not winning the race) are stored somewhere and on next caller's next they will participate in the next race again, along with the next next of the previously signaled item.

And so, we cannot exit the loop from the caller, and the only problem here is the queue for next/throw/return execution. Say, we have something like this:


const src = {...}

async function* h1() {
   for await(const i of src) {
      yield some(i)
   }
}

async function* caller() {
    for await (const i of merge(h1(), somethingElse))  {
          if (someCondition)
             break
     }
}

and the simplified transpilation of for-await:


const src = {...}

async function* h1() {
   const loop = src[Symbol.asyncIterator]()
   try {
      for(let item;!(item = await loop.next()).done;) {
         yield some(item.value)
      }
  } finally {
     await src.return()
  } 
}

async function* caller() {
   const loop = merge(h1(), somethingElse)[Symbol.asyncIterator]()
   try {
       for(let item; !(item = await loop.next()).done;)  {
            if (someCondition)
               break
        } 
   } finally {
      await loop.return() 
   } 
}

Say when the control encounters break there it is because somethingElse.next was resolved in Promise.race, and the h1().next still sits in next (maybe it will never be resolved).

If we wouldn't have the queue the break in caller would immediately call return in merge. In merge it would call all the return of its arguments. That would invoke return continuation in h1 (no needs to cancel the currently pending next Promise, only mark its continuation as canceled). And the h1 finally block will call return of src. It is not a generator and it may know how to handle it properly. Say, it will resolve the promise to some dummy value, and since the continuation is marked as canceled everyone is happy.

With the queue the return call of h1 may stay in the queue forever, leaking a few objects.

mika-fischer commented 6 years ago

I have the same issue! I want to write an iterator adapter that makes an async iterator cancelable.

async function* iterateStream(stream) {
    let ended = false, error, wake = () => {};
    stream.on("readable", wake)
          .on("end",   ()    => { ended = true; wake(); })
          .on("error", (err) => { error = err;  wake(); });
    for (;;) {
        for (let chunk = stream.read(); chunk; chunk = stream.read()) {
            yield chunk;
        }
        if (error) { throw error; }
        if (ended) { return; }
        await new Promise(resolve => wake = resolve);
    }
}

async function* withTimeout(timeoutMs, iter) {
    let timer;
    try {
        const timeout = new Promise((_, reject) => {
            timer = setTimeout(reject, timeoutMs, new Error("Timeout!"));
        });
        for (;;) {
            const result = await Promise.race([timeout, iter.next()]);
            if (result.done) { break; }
            yield result.value;
        }
    }
    finally {
        clearTimeout(timer);
        if (iter.return) { await iter.return(); }
    }
}

async function main() {
    const socket = net.connect(80, "httpbin.org");
    try {
        for await (const val of withTimeout(1000, iterateStream(socket))) {
            console.log("[main] got value %s", val);
        }
    }
    finally {
        socket.end();
    }
}

This also gets stuck in await iter.return() and can only exit the loop when the socket closes. And because the socket is closed outside the loop, this results in a deadlock, which is only resolved when the socket times out...

It seems to me that 1) An important use-case for async iteration involves iterators that can block for a significant amount of time, maybe even infinitely. Think reading from a socket, waiting for user input, etc. 2) Async generators can not be used for implementing async iterators in such cases because if the generator is stuck in a long await there's no way anymore to break from the loop, raise an exception, etc. Control flow will have to wait until this await is finished...

If I understand all this correctly, this seems very unfortunate, since async generators otherwise would make implementing async iterators very easy!

zenparsing commented 6 years ago

There are a couple of things going on here.

First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block. Consider what happens in the case of "regular" async functions:

async function f() {
  try {
    throw new Error('oops');
  } finally {
    console.log('finally f');
    // Never-resolving
    await new Promise(() => {});
  }
}

async function g() {
  try {
    await f();
  } finally {
    // This is never executed
    console.log('finally g');
  }
}

g();

The lesson here is: be careful about putting an await in a finally block, regardless of whether we're dealing with an async function or async generator function.

The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation. Although it would be nice if that were the case, we need to think of return as cleanup, not cancellation.

As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.

The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.

benjamingr commented 6 years ago

Reminds me of https://github.com/tc39/ecmascript-asyncawait/issues/51 when I pondered if finally in await should even be allowed.

awto commented 6 years ago

@zenparsing

First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block.

No, at least in my example, finally block is never executed and this is why it is a problem.

Consider what happens in the case of "regular" async functions: The lesson here is: be careful about putting an await in a finally block, regardless of whether we're dealing with an async function or async generator function.

I see absolutely no problem in this example, someone could put never resolving promise in await in a usual block without finally, and control would never reach anything after.

The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation.

No, I highlighted it in the message, Promise cancelation is not required if you mean that by async calculation.

Although it would be nice if that were the case, we need to think of return as cleanup, not a cancellation.

Let's keep it practical, without resorting to philosophical terms interpretation. I might as well call it - clean generator object.

As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.

I don't get this. What is not linear? Everything is kept linear without the queue in the generator scope as well, outside the scope, everything is async as usual. I see no problem again. What is another channel? Another channel means no async generators and implementing async iterators protocol?

The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.

This won't work, there is no another channel, unless all async iterators may interpret the token themselves in their code. But that just means - no async generators. There is no way to handle any token if control is resting in await.

Also @zenparsing in the first message, I actually copied your implementation of converting async generator to observable - toObservable. It doesn't work because of the same reasons. Do you confirm now, you were wrong and it should not be possible to convert async generators into observable?

@benjamingr

Reminds me of tc39/ecmascript-asyncawait#51 when I pondered if finally in await should even be allowed.

In my message, it is some possible transpilation result, not the finally actual usage, await in finally. Though I don't see a problem even if it is the usage. The finally block there is a solution, not a problem. But it doesn't work only because there is the queue. And there are no obvious reasons for it.

Jamesernator commented 6 years ago

I think the queue as it currently exists should remain, but I still think it would be worth reviving some form of syntactic cancellation mechanism to make this sort've thing easier (what is one to do with for-await-of loops for example if there's no syntactic mechanism).

I don't really see how the cancellation proposal could solve this issue in it's current form, unless it was decided that .next should be sent the cancellation token (in which case to use async generators to write anything we essentialy require function.sent for the part before the first yield).

Even sending the cancellation token forward with .next it still means for-await-of is un-usable if you want to be able to cancel a request, e.g. you'd have to write it using iterators directly e.g.:

function map(asyncIterable, iteratee) {
    return {
        async* [Symbol.asyncIterator]() {
            const iterator = getIterator(asyncIterable)
            try {
                while (true) {
                    const cancelToken = function.sent;
                    const value = iterator.next(cancelToken);
                    yield iteratee(value);
                }
            } finally {
                await iterator.return()
            }
        }
    }
}

This seems quite bad compared to the naive implementation of map that just loops through the items and yields iteratee(value) in a for-await-of loop. But frankly I don't see any other way it could work with the current cancellation proposal.

awto commented 6 years ago

@Jamesernator

I think the queue as it currently exists should remain,

but why? I mean, I've, of course, read the motivation like to make something running in parallel with Promise.all or dangling promises, but as @zenparsing wrote Async Generators linearize their control flow. There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

While of course, something implementing Async Iterator protocol but not Generator could do this. But the thing may handle the order itself in whatever way it likes. It may be a queue, it may be a priority queue it may ignore something etc.

It is also easy to write a wrapper to add a queue to any other Async Iterator, e.g. queue(something) if someone really needs it. But if it is as it is now - it is an unnecessary complication for nothing. And you propose to complicate it even more with some cancelation protocol. While the cancelation isn't required here. We, of course, need to cancel a continuation after await where the generator is suspended. But this effect is visible only inside the same single generator. It is not related to some async operation cancelation.

benjamingr commented 6 years ago

There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

That's not true for arbitrary async iterators though

benjamingr commented 6 years ago

I would love to see an implementation of combineLatest with the semantics proposed by @awto though - I looked at all my implementations from when I gave a talk and they suffer from this problem.

I haven't had proper time to think about it - but I definitely think it's worth solving and the spec is worth amending if it can't be.

awto commented 6 years ago

@benjamingr

There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.

That's not true for arbitrary async iterators though

indeed, but there is no any queue for arbitrary async iterators anyway, the iterator itself can decide how to handle the requests

Jamesernator commented 6 years ago

Yes but it's because other types of async iterators exist that generators need to queue. If a consumer consumes two values eagerly and then invokes .return an async iterator only knows that the requests are done not the existing nexts should be cancelled.

e.g. One might implement a preloading version of take that consumes values eagerly instead of as requested e.g.:

function eagerTakeN(asyncIterable, n) {
    async* [Symbol.asyncIterator]() {
        const iterator = getIterator(asyncIterable);
        // Preload values as fast as possible into buffers of a given size
        const buffer = []
        for (let i = 0 ; i < n ; i++) {
            buffer.push(iterator.next())
        }
        // Allow the iterator to stop queuing now and cleanup when *it*
        // wants to
        const done = iterator.return();
        const results = await Promise.all(buffer);
        for (const { done, value } of buffer) {
            if (done) {
                return value;
            }
            yield value;
        }
        await done;
    }
}

Note that if you know the iterables are generators it makes no difference to consume multiple concurrently, but if it's an arbitrary iterable it might be better to consume earlier. .return says that no more values will be requested but to fufill the requests that have already been requested. Because of the this the most sensible option for an async generator is just to queue as a consumer like eagerTakeN shouldn't need to know the type of the iterator.

If you want to cancel the .next Promises then there should be a mechanism to say I longer care about those specifically, in the case of an async generator I would definitely expect Promise cancellation to close an async generator though on cancelling a Promise returned from a .next.

awto commented 6 years ago

@Jamesernator The same may be solved using explicit queue wrapper. I would rather have merging working rather than the implicit queue. And as another advantage of the queue is explicit is we can use some smarter application-specific scheduling algorithms.

Jamesernator commented 6 years ago

The thing is cancellation allows you to have both which I why I want to see a better cancellation proposal than just a builtin CancelToken type or the like.

What you're proposing effectively requires that await is a potential interruption point which I think would be surprising given that it gives a producer no assurances that if they perform some work between yields that it will ever be executed.

e.g. Consider this that needs to free a lock immediately after producing a value:


async function* readFromDatabase() {
    const cursor = db.getCursor('someTable');
    while (true) {
        // This await could be interrupted
        const dbLock = await db.acquireLock();
        const value = cursor.nextValue();
        dbLock.unlock();
        yield value;
    }
}

But from what I can tell under your proposed change if it were to be cancelled via .return while stopped on db.acquireLock() then db.acquireLock() would proceed as per normal but the generator would be terminated and now the lock would never be freed as the generator was terminated in the middle of an operation.

awto commented 6 years ago

@Jamesernator

fair point, I don't really mind much about the cancelation, just wanted something simpler. Still, the solution to this problem is possible, it can be some db context where we can signal the lock isn't longer required in finally. Or just implement the read logic in separate not-generator function. Or, again an explicit wrapper - queue(async function* readFromDatabase(){....}). It is indeed complex and not intuitive, but possible. While merging generators isn't possible now at all.

awto commented 6 years ago

actually, the cancelation proposal will indeed solve the problem faster than something can be changed in this spec, so I'm closing the issue. Thanks, everyone, esp. @Jamesernator.

awto commented 6 years ago

in case if someone needs this urgently, you can use my transpiler - @effectful/es (sorry for advertisement). In abstract interface mode, you can extend the interface with any functionality you want including cancelation or no queue or anything else.

benjamingr commented 6 years ago

@awto what cancellation proposal? :D

awto commented 6 years ago

@benjamingr cancellation proposal - I suppose it is the thing to solve the problem somehow in some future, there are not many details now, and it will happen apparently not soon. For my task, I'm doing a workaround with my transpiler for now.

awto commented 6 years ago

there are libraries emerging with (not-safe) iterator's combinations, so I think it is rather worth keeping this issue open

benjamingr commented 6 years ago

Maybe talk to the people from https://github.com/jamiemccrindle/axax @jamiemccrindle

jamiemccrindle commented 6 years ago

Does anyone have a simple test case that demonstrates this issue?

awto commented 6 years ago

@jamiemccrindle I would at least add a warning on your page, axax is still usable if the sources are managed, but as a user, I would like to be aware of possible problems, e.g.

const s1 = new Subject()
const s2 = new Subject()

async function* something() {
  try {
    for await(const i of s2.iterator)
      yield i
  } finally {
    console.log("never called")
  }
}

async function test() {
  for await(const i of merge(s1.iterator,something())) {
    break
  }
}

test()
s1.onNext(1)

here is the demonstration of the leak with merge if s2 never touched, something instance will leak. It may be fixed if the program manages references but it is not required with, say, Observables.

jamiemccrindle commented 6 years ago

Thanks for the example @awto. I'll add a note to axax.

jamiemccrindle commented 6 years ago

Would you say that this is comparable to a Promise that never returns. Something like:

async function example() {
  await new Promise((resolve, reject) => {});
}

example.then(() => console.log('never gets here'));
awto commented 6 years ago

I don't think so, there is still a way to finish the chain, e.g. by calling s2.onCompleted(), but no way to stop it from another end because the generator there sits in next and there is a queue requirement, so return just needs to await next to exit. But the queue cannot be removed because it will introduce another resource management problems (covered in this thread).

jamiemccrindle commented 6 years ago

Thanks! I've added a note on the axax page.

zenparsing commented 6 years ago

Would you say that this is comparable to a Promise that never returns.

It is. A consumer is waiting on the resolution of a promise for "next" but that promise is never resolved. In this case, it is because the "producer" has not called "s2.onNext". Be mindful of this when attempting to port Observable-style programming patterns to async iteration.

zenparsing commented 6 years ago

Actually, I think I misunderstood something about the example. I'll defer to @awto, who has a pretty good handle on this.

benjamingr commented 6 years ago

@zenparsing an analogy of this as promises never resolving would be:

let resolve;
const p = new Promise((r) => resolve = r);
// never call `resolve`, but it is still a possibility
example.then(() => console.log('never gets here'));

@awto wouldn't s2.iterator.return() from the outside stop it and run the finally or making .return on a merge result call .return on all iterators it is merging?

awto commented 6 years ago

@benjamingr

wouldn't s2.iterator.return() from the outside stop it and run the finally or making .return on a merge result call .return on all iterators it is merging?

it would if the generator is suspended in yield, here it is in await of next. So this will execute finally block only on one merging iterator (s1 in my example).

@zenparsing I think your explanation is correct. In mine, I just wanted to highlight the problem is more due to the queue, the return method is still called, but its action is enqueued by generators implementation because the generator is stuck in never settling next. If something wasn't a generator but a function returning a manually constructed object with the 3 methods (next/throw/return) it would receive the return call here, and it could know how to handle it, e.g. it could store the next Promise's callback and resolving it with {done:true} or something like that.

benjamingr commented 6 years ago

@awto

it would if the generator is suspended in yield, here it is in await of next. So this will execute finally block only on one merging iterator (s1 in my example).

You are right, I was sure that await would be progressed (similarly to yield) when .return is called on the async generator. Thanks for bearing out with me - I didn't actually understand the issue very well before.

It sounds like it's a lot more fundamental than just merging generators. In the below example:

async function* stuck() {
  const db = await Promise.resolve("in real world usage - acquire DB");
  try {
    await new Promise(() => { /* represents some operation never ending */ });
    // yield result;
  } finally {
    console.log('releasing');
    /* db.release */
  }
}
let i = stuck();
i.next();
i.return(); // nothing logged

This is very unfortunate in my opinion since I have no idea how to solve it without broader async function cancellation. Given that async generators already have .return this might motivate TC39 to give .return on async function results (risky territory since that's promises).

However, at the current state I'd say the language is simply incomplete - since the way .return works. One thing you can do if the implementation opts in would be to do:

async function* stuck(cancel) {
  const db = await Promise.resolve("in real world usage - acquire DB");
  try {
    await Promise.race([new Promise(() => { /* never ending */ }), cancel]);
    // yield result;
  } finally {
    console.log('releasing');
    /* db.release */
  }
}
var reject, p = new Promise((_, r) => reject = r);
var i = stuck(p);
i.return = () => {
 reject(new Error('cancelled'));
 i.__proto__.return(); // return on the generator
};

But that would trigger the error handling code if the function had any in addition to the releasing code which is also unfortunate.

e.g. it could store the next Promise's callback and resolving it with {done:true} or something like that.

The problem with a less than complete solution is that .returning doesn't actually have to close the iterator - also the returned value might end up being used:

async function* getUserNames() {
  let db, cursor;
  try {
    const db = await getDb();
    const cursor = await getCursor(db, 'users');
    for await(const { name } in cursor) yield name; 
  } finally {
     if(db) db.release();
     if (cursor) cursor.release();
  }
}

If we .return when the cursor is being awaited with a value - .release will be called on it but since it doesn't have a .release method it will throw an error.

zenparsing commented 6 years ago

Let's see if we can pin down the problem that we're seeing, specifically for async-iterator combinator libraries.

This issue will present itself any time a combinator is pulling more than one result from its inputs in order to deliver one result to the output.

For merge, it will pull from all of its inputs and output the result that is delivered first.

This would also be a problem for a combinator that optimistically pre-pulls N > 1 values from a single input when "next"ed.

benjamingr commented 6 years ago

Let's see if we can pin down the problem that we're seeing, specifically for async-iterator combinator libraries.

The problem is pretty simple even without combinators - you cannot .return meaningfully on an async iterator while it is awaiting. This is manifested in the combinator issue where .next is forever waited on although it is certainly not the only issue. It is easy to reproduce as done https://github.com/tc39/proposal-async-iteration/issues/126#issuecomment-403454433 without combinators at all.

If the Promise ever resolves there is no issue - the problem is that there might be a dependency on a value from next that never arrives.

I think that unfortunately this issue is the same as async function cancellation in general. The 'generator' part of async generators has cancellation but the corresponding 'async' part does not. I don't see a way forward other than pushing for async cancellation semantics.

That said - if the promises are sure not to depend on each other and are otherwise ensured to resolve things are fine. That's not really much comfort since for await creates such a dependency trivially like @awto showed. Maybe the ad-hoc solution suggested is worth it.

taion commented 6 years ago

We're hitting a memory/resource leak in production due to exactly this issue.

The reference GraphQL subscription implementation uses async iterables. To work with this, we immediately coerce our event sources to async iterables, using a pattern like the example in https://github.com/tc39/proposal-async-iteration/issues/126#issuecomment-403454433.

Because of the await semantics, we find ourselves ending up with resources that do not get cleaned up on a timely basis, or occasionally do not ever get cleaned up.

This seems quite bad – the "obvious" syntax here in many cases will lead to fairly subtle resource leaks that can be quite difficult to debug.

pauldraper commented 6 years ago

This "issue" is already true of existing generators.

const s1 = [1]
const s2 = [2]

function* something() {
  try {
    for (const i of s2)
      yield i
  } finally {
    console.log("never called")
  }
}

function test() {
  const some = something()
  for (const i of s1) {
    return
  }
  for (const i of some) {
  }
}

test()

There is nothing different about async generators from synchronous ones.

taion commented 6 years ago

The difference in practice is that if you have code to allocate a resource inside your something as above, that resource will not actually get allocated per your example above.

taion commented 6 years ago

e.g. if you put a console.log('start') in something, that console.log will never get called either

pauldraper commented 6 years ago
const s1 = [1]
const s2 = [2]

function* something() {
  console.log("start")
  try {
    for (const i of s2)
      yield i
  } finally {
    console.log("never called")
  }
}

function* merge(g1, g2) {
  for (let r1, r2; r1 = g1.next(), r2 = g2.next(), !r1.done || !r2.done; ) {
    if (!r1.done)
      yield r1
    if (!r1.done)
      yield r2
  }
}

function test() {
  for (const i of merge(something(), s1[Symbol.iterator]())) {
    return
  }
}

test()
awto commented 6 years ago

@pauldraper here it is your merge function problem, if you don't iterate over iterables in for-of or yield* it is your responsibility to propagate return method calls, this is a correct merge version:

  try {
    for (let r1, r2; r1 = g1.next(), r2 = g2.next(), !r1.done || !r2.done; ) {
      if (!r1.done)
        yield r1
      if (!r1.done)
        yield r2
    }
  } finally {
    if (g1.return)
      g1.return();
    if (g2.return)
      g2.return();
  }

for async generators, such correct version doesn't exist

pauldraper commented 6 years ago

Perhaps I have misunderstood the spec? V8 implements that AsyncIterator.prototype.return():

async function* something() {
  console.log("start")
  try {
    yield await Promise.resolve(1)
    yield await Promise.resolve(2)
  } finally {
    console.log("called")
  }
}

async function* merge(g1, g2) {
  try {
    for (let r1, r2; r1 = await g1.next(), r2 = await g2.next(), !r1.done || !r2.done; ) {
      if (!r1.done)
        yield r1.value
      if (!r1.done)
        yield r2.value
    }
  } finally {
    g1.return();
    g2.return();
  }
}

async function test() {
  for await (const i of merge(something(), something())) {
    return;
  }
}

test()

This is an interleaving merge. I don't know what the original merge was supposed to do. For a greedy merge,

async function* something() {
  console.log("start")
  try {
    yield await Promise.resolve(1)
    yield await Promise.resolve(2)
  } finally {
    console.log("called")
  }
}

async function* merge(g1, g2) {
  const gs = [g1, g2];
  try {
    const ps = new Map(gs.map((g, i) => [i, g.next().then(r => ({r, i}))]))
    while (ps.size) {
      const {r, i} = await Promise.race(ps.values());
      if (r.done) {
        ps.delete(i)
      } else {
        ps.set(i, gs[i].next().then(r => ({r, i})))
        yield r.value
      }
    }
  } finally {
    for (const g of gs) {
      g.return();
    }
  }
}

async function test() {
  for await (const i of merge(something(), something())) {
    return;
  }
}

test()
pauldraper commented 6 years ago

As far as I can see, the only way to produce leak with AsyncGenerators that wouldn't happen with Generators is to have a Promise that never resolves. And non-resolving Promises already cause leaks in async functions.

awto commented 6 years ago

yes, interleaving merge does work (except return call may require await to avoid racing conditions), your interleaving merge isn't, in fact, merge, as you call next only once there.

But anyway, even if you make it working (or check some already available from the thread) it may look working for the input iterables - immediately returning values. The leak appears if one iterator suspends in yield and another in await for a long time. And something there doesn't suspend.

littledan commented 6 years ago

Given that this proposal is at Stage 4, shipped in multiple web browsers and Node.js, and part of ES2018, I'd suggest pursuing any changes from here in a follow-on proposal or PR, rather than in this repository.

benjamingr commented 6 years ago

Also, I would also recommend spending time on reducing the example to a point it's easier to understand. I'm using async iterators daily and have given talks about them and I'm having a hard time understanding the exact issue here!

A slideshow perhaps?

hax commented 6 years ago

Agree with @benjamingr . I used to think I understand this issue (which cost me several hours to read all comments), but I find I have forgotten it now and after 3 minitues it seems I can't recall it without reading the whole thread again. 😂

littledan commented 6 years ago

This sounds like a good direction. Does anyone want to start a new repository and continue the discussion there?

taion commented 6 years ago

It's a little tricky for reasons I'll lay out in a sec. I'll get a self-contained example set up in a bit and maybe adapt it to a Medium post later. I don't think anybody has a viable proposal for including things, so this is more of a "here there be dragons" kind of warning thing.

awto commented 6 years ago

There are no needs for another proposal, as the already available async cancelation proposal will fix the issue. It can be handled even now with Bluebird promises (they have cancelation) + my transpiler.

The issue is worth keeping open as a warning for combinators libraries implementors. If you think it is wrong, feel free to close the issue.

I can make a PR where I can try to explain the use case and a problem and put a warning probably in README, but not soon though.