Reinstate immediate synchronous dispose in take and takeWhile rather than relying solely on end() to dispose the whole Sink chain. Use SettableDisposable to deal with cases where source stream may end synchronously before source.run returns (and thus SliceSink's this.disposable is not yet assigned)
Summary
Fix #466
Reinstate immediate synchronous dispose in
take
andtakeWhile
rather than relying solely on end() to dispose the whole Sink chain. Use SettableDisposable to deal with cases where source stream may end synchronously beforesource.run
returns (and thusSliceSink
'sthis.disposable
is not yet assigned)