bitrich-info / xchange-stream

XChange-stream is a Java library providing a simple and consistent streaming API for interacting with Bitcoin and other crypto currency exchanges via WebSocket protocol. It is build on top of of XChange library providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library.
Apache License 2.0
414 stars 219 forks source link

[Bitmex] There is a bug on orderChanges endpoint #497

Open makarid opened 4 years ago

makarid commented 4 years ago

Hello, the implementation of orderChanges in BitmexStreamingTradeService has a bug when an order has been filled. When Bitmex sends a FILLED order message, it doesn't include many required fields in order to create a new LimitOrder and send it to the stream. These data are null from the response: side,price,ordType and orderQty have null values and because of that a NullPointException occures. Here is the output which i have encounter:

2020-01-09 16:54:37 INFO BitmexStreamingMarketDataService:37 - Bitmex connection succeeded. Clearing orderbooks. 2020-01-09 16:54:37 INFO BitmexStreamingService:312 - Subscribing to channel order BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='Buy', price=7907.5, avgPx=null, ordType='Limit', ordStatus=NEW, clOrdID='', orderQty=1, cumQty=0, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'} LimitOrder [limitPrice=null, Order [type=BID, originalAmount=1, cumulativeAmount=0, averagePrice=null, fee=null, currencyPair=XBT/USD, id=9f2f8774-0573-9d3c-a45c-216ed7eabadd, timestamp=null, status=NEW, flags=[], userReference=null]] BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='null', price=null, avgPx=7905, ordType='null', ordStatus=FILLED, clOrdID='', orderQty=null, cumQty=1, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'} null io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81) at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207) at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177) at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207) at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177) at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358) at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129) at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33) at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52) at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96) at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.NullPointerException at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77) ... 45 more Exception in thread "nioEventLoopGroup-2-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77) at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81) at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207) at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177) at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207) at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177) at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358) at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129) at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33) at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52) at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96) at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.NullPointerException at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39) at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77) ... 45 more

makarid commented 4 years ago

How can we send a limitOrder/marketOrder to the stream in which we don't have the side field?

mdvx commented 4 years ago

You may notice that Kraken too will send orderChanges with out a side or currencyPair, this is because the updates are deltas, and the fields did not change. Maybe this is a larger discussing, because 'fixing' this requires an order cache. @badgerwithagun

makarid commented 4 years ago

Indeed, we need to discuss about it. I am waiting for @badgerwithagun input.

makarid commented 4 years ago

I suggest to make an openOrderList cache and everytime an update occurs to send the updated list of openOrders. So instead of having Observable.Order we should have Observable.OpenOrders. What do you thing? Because openOrders updates depend to one another and they are not like userTrades which they are one time only events.

mdvx commented 4 years ago

How do we handle delta orderbook updates? should we try and use the same pattern. I am not sold on the idea of caching order at the exchange level, because it requires synchronisation (however if we could do something where the Exchange holds a cache instance, then different exchanges could implement different caching strategues.

synchronisation: When we have updates with 2+ fields (without sync, the order state is temporarily invalid)

makarid commented 4 years ago

What do you mean temporarily invalid? If we save a local copy of a List of openOrders, every update that will come from the stream will update this list. So even if there are 2 updates for the same order, they will be 2 separate messages and we will handle with FIFO. Can you please give an example? Thanks

mdvx commented 4 years ago

lets say you have an order, that get an update to executedQuantity, averagePrice and state: you can: A) copy the existing order in the cache, update the copy, and replace the orignal order with the copy.
1) This can leave hanging (previous) order objects B) sync(lock) the exiting order, upate all the fields, release lock 1) lots of locking, but probably ok C) Other means? but basically, my thought is cache should operate in a maner that best matches the exchanges update cycle (Still one cache per exchange, but maybe cache for Kraken is different to BitMex)

makarid commented 4 years ago

Maybe i cannot understand because i don't have your experience,but as i see it we don't need any locking because there isn't any concurrency or parallelism. The stream will sends us, let's say 10 message in one sec. We will have a for loop that will make checks on every message one by one and we will add/update the locally stored OpenOrders object for every message. When there are no more messages left we will broadcast the OpenOrders object.

2) In order to have similar behaviour for every exchange we will store an OpenOrders object, so the code will work for every exchange and there will not be any BitmexOrders or KrakenOrders object.

On Mon, 13 Jan 2020, 05:03 Marc Deveaux, notifications@github.com wrote:

lets say you have an order, that get an update to executedQuantity, averagePrice and state: you can: A) copy the existing order in the cache, update the copy, and replace the orignal order with the copy.

  1. This can leave hanging (previous) order objects B) sync(lock) the exiting order, upate all the fields, release lock
  2. lots of locking, but probably ok C) Other means? but basically, my thought is cache should operate in a maner that best matches the exchanges update cycle (Still one cache per exchange, but maybe cache for Kraken is different to BitMex)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/bitrich-info/xchange-stream/issues/497?email_source=notifications&email_token=AHIWQ73ZAE5WHBBH2CQLQPTQ5PKYBA5CNFSM4KEZ2GR2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEIXMZFI#issuecomment-573492373, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHIWQ73CETJGBQ4X6YIGH4DQ5PKYBANCNFSM4KEZ2GRQ .

mdvx commented 4 years ago

When we broadcast an OpenOrder object, we broadcast a reference to an OpenOrder object.

When another thread gets this broadcast it recieves the reference to the OpenOrder object, which might now have processed next updates, or be in the middle of processing them, or not. The OpenOrder is in an indeterminate state.

mdvx commented 4 years ago

Whereas the are always going to some states where and order is expected to be in situe (PENDING,WORKING) there are other states are considered to final and non-changeable (FILLED, DONE, CANCELED). This allows the algos to make decisions abount sending the next set of orders.

makarid commented 4 years ago

Thanks for the explanation. Now i understand it. So do you believe that we can solve this issue with a lock? Will this lock have any meaningful speed reduction on the broadcast? Thanks

On Mon, 13 Jan 2020, 12:41 Marc Deveaux, notifications@github.com wrote:

When we broadcast an OpenOrder object, we broadcast a reference to an OpenOrder object.

When another thread gets this broadcast it recieves the reference to the OpenOrder object, which might now have processed next updates, or be in the middle of processing them, or not. The OpenOrder is in an indeterminate state.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/bitrich-info/xchange-stream/issues/497?email_source=notifications&email_token=AHIWQ75QRFMN75CLRD2WZY3Q5RANJA5CNFSM4KEZ2GR2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEIYHO7I#issuecomment-573601661, or unsubscribe https://github.com/notifications/unsubscribe-auth/AHIWQ75BRMZ5BI5L5ULNM2TQ5RANJANCNFSM4KEZ2GRQ .