ReactiveX / IxJS

The Interactive Extensions for JavaScript
https://reactivex.io/IxJS/
MIT License
1.32k stars 73 forks source link

from with abort signal #352

Closed eunjungoh35 closed 8 months ago

eunjungoh35 commented 1 year ago

IxJS version: 5.0.0

Code to reproduce:

import * as ixa from 'ix/asynciterable';
import * as ia from 'ix/asynciterable/operators';

async function* yieldFunction(i: number, signal?: AbortSignal) {   
   while(true){
      if(signal !== undefined) {
         if(signal.aborted) {
            break;
         }
      }
      await new Promise(f => setTimeout(f, 3000));
      yield i++;
   }
}
const source = function () {
   return ixa.defer((signal) => {
      const i = 0;
      console.log('defer function', {signal});
      return ixa.from(yieldFunction(i, signal)).pipe(
         ia.finalize(() => console.log('finalize called'))
      );
   })

};

const abort = new AbortController();
const result = ixa.from(source()).pipe(ia.withAbort(abort.signal));
ixa.from(result).forEach((value) => console.log(`Next: ${value}`));

Expected behavior: defer function receives signal from abort controller.

Actual behavior: received undefined for signal inside of defer function.

Additional information: patch.zip

from operator doesn't take signal for async iterable source. Please find the attached for the fix.

trxcllnt commented 1 year ago

@eunjungoh35 Thanks for the suggestion, could you make a PR with your fix?