rsocket / rsocket-kotlin

RSocket Kotlin multi-platform implementation
http://rsocket.io
Apache License 2.0
552 stars 37 forks source link

Fix sending redundant RequestN frame #101

Closed whyoleg closed 4 years ago

whyoleg commented 4 years ago

fixes #97

whyoleg commented 4 years ago

The idea of the fix: StreamFlow inherits ChannelFlow to support flow fusion (so to support buffer(n) operator). Before, it was using channels under the hood. Because of it, additional requestN can be sent, because of asynchronous cancellation. Now it don't use channels, but manually switch context (needed to support flowOn operator). But because emiting to FlowCollector should be from the same dispatcher, we need to switch context back while emiting.