poelstra / ts-stream

Type-safe object streams with seamless support for backpressure, ending, and error handling
MIT License
65 stars 13 forks source link

Return a promise from Readable.forEach() #2

Closed rogierschouten closed 9 years ago

rogierschouten commented 9 years ago

Please return a promise from forEach() which resolves with the end() result. This allows to write:

    /**
     * Convert all unconverted tickets
     */
    protected _run(runTime: tc.DateTime): Promise<void> {
        // find last exlent ID
        return this._outputDb.getMaxExlentId()
            .then((maxId: number): Promise<tsStream.Readable<rawData.Snapshot>> => {
                var fromId = maxId;
                if (typeof this._lastId === "number" && (fromId === null || this._lastId > fromId)) {
                    fromId = this._lastId;
                }
                return this._rawDb.getSnapshotStream(rawData.SnapshotType.Archive, fromId);
            })
            .then((snapshots: tsStream.Stream<rawData.Snapshot>): Promise<void> => {
                var converterStream = snapshots.map((snapshot: rawData.Snapshot): Promise<outputData.AmidSqlTicket> => this._convertTicket(snapshot));
                return converterStream.forEach((ticket: outputData.AmidSqlTicket): Promise<void> => {
                    return this._outputDb.addTicket(ticket);
                        .then((): void => {
                            this.count++;
                        });
                });
            });
    }
poelstra commented 9 years ago

There are 2 reasons why forEach() doesn't return a promise:

  1. forEach() is the internal building block of all other operations, like .map(). However, because most of these already return another Stream, a promise returned by the (internal) .forEach() somehow needs to be 'ignored' (considering Possibly Unhandled Rejection detection).
  2. it may not actually be what you want, as it would indicate when forEach()'s end handler itself is finished, not necessarily when the end call at the very start of a whole streaming chain is finished.

Note that a Stream already has an ended() method that you can use for this purpose.

In your example, you would:

var converterStream = snapshots.map(/* ... */);
converterStream.forEach(/* ... */);
return converterStream.ended(); // although snapshots.ended() might be more appropriate

What do you think?

rogierschouten commented 9 years ago

I agree :)