bitrich-info / xchange-stream

XChange-stream is a Java library providing a simple and consistent streaming API for interacting with Bitcoin and other crypto currency exchanges via WebSocket protocol. It is build on top of of XChange library providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library.
Apache License 2.0
413 stars 219 forks source link

(Re)subscribe to a channel with correct emitter #549

Open mladenmarkov opened 4 years ago

mladenmarkov commented 4 years ago

When (re)subscribing to a channel and there's already a Subscription object created, it's being reused. However, it contains the old emitter, from a previous subscription attempt.

This pull request suggests to always create a new Subscription object, to ensure future notifications are sent to the new emitter. Otherwise, the new subscription will not receive any events whatsoever.

This change allows subscriptions to be retried automatically using the retryWhen() operator.

mdvx commented 4 years ago

please check out https://github.com/bitrich-info/xchange-stream/pull/542 it deals with overlapping issues

mdvx commented 4 years ago

would the doOnDispose() at line 378 (which removes the subscription from the map) not achieve the same result? How would retryWhen() be used?

mladenmarkov commented 4 years ago

please check out

542

it deals with overlapping issues

Not really overlapping issues. That pull request is for auto-reconnect, which is great. However, it does not deal with the issue that (re-)subscriptions can fail for any reason (broken connection, rate limit, etc) and then subscriber would not be notified.

Imagine subscribing for order books of 100 pairs and a rate-limited exchange (yes, Kraken, but also others). The first 20 or so subscriptions would be successful, but the rest would not. The connection is OK and there will be no auto-reconnect happening, to call the resubscribeChannels() method. The consumer gets the failures in their respective onError() handlers and retry, but without disposing the Disposable. It cannot be assumed that the disposable will be disposed before retrying, besides retryWhen doesn't seem to do that either.

Now, since the channels map already contains a Subscription object, the second call to the subscribeChannel() method (the retry) will not update the map and subsequent events will not be sent to the new consumer. That's why I think the map should be updated on each call of the subscribeChannel() method.

For how retryWhen() would be used, you can see #550. I've included a test which automatically retries subscriptions with an exp. backoff strategy.

makarid commented 4 years ago

This is a must feature in my opinion. Thank you @mladenmarkov for this PR.

mdvx commented 4 years ago

I was referring to (which is also in PR#542 )


   * Some exchanges rate limit messages sent to the socket (Kraken), by default this method does not rateLimit the
   * messages sent out.
   * Override this method to provide a rateLimiter, and call acquire on the rate limiter,
   * to slow down out going messages.
   */
  protected void sendMessageRateLimiterAcquire() ```

i saw the #550 back off strategy, impressive.
badgerwithagun commented 4 years ago

566 Also covers a related discussion

badgerwithagun commented 4 years ago

This project is in the process of being merged into the XChange project and no further PRs will be merged here. Once the projects have been merged, there may be a short stabilization period where there will be large-scale renaming of classes and packages, which may cause conflicts. You are advised to wait at least a week from now and then resubmit your PR on the XChange project. Thank you for your support!

badgerwithagun commented 4 years ago

You can now resubmit your PR on XChange. This project will shortly be marked as archived.