Petersoj / alpaca-java

A Java API for Alpaca, the commission free, algo friendly, stock trading broker. https://alpaca.markets
https://petersoj.github.io/alpaca-java/
MIT License
197 stars 82 forks source link

Ability to track the socket state and events (isConnected, onConnected(), onDisconnected()) #95

Closed Caceresenzo closed 3 years ago

Caceresenzo commented 3 years ago

Hi

Not having the possibility to know in what state the socket currently is is leading to unexpected errors.

I often get:

2021-05-05 18:22:02.230  INFO 28323 --- [t@1169015699-48] .j.a.w.c.AbstractWebsocketClientEndpoint : Reconnecting due to closure: CLOSED_ABNORMALLY
2021-05-05 18:22:02.234  INFO 28323 --- [t@1169015699-48] .j.a.w.c.AbstractWebsocketClientEndpoint : Connecting to wss://api.alpaca.markets/stream
2021-05-05 18:22:02.240 ERROR 28323 --- [t@1169015699-44] .j.a.w.c.AbstractWebsocketClientEndpoint : Websocket Error!

java.io.IOException: Broken pipe
        at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.flush(SslConnection.java:1086) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.WriteFlusher.flush(WriteFlusher.java:422) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:277) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.AbstractEndPoint.write(AbstractEndPoint.java:381) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.FrameFlusher.flush(FrameFlusher.java:264) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.FrameFlusher.process(FrameFlusher.java:193) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:241) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.IteratingCallback.iterate(IteratingCallback.java:223) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.outgoingFrame(AbstractWebSocketConnection.java:581) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.client.io.WebSocketClientConnection.outgoingFrame(WebSocketClientConnection.java:58) ~[websocket-client-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.close(AbstractWebSocketConnection.java:181) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:510) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:440) ~[websocket-common-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:540) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:395) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:161) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036) ~[jetty-util-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2021-05-05 18:22:17.263 ERROR 28323 --- [d26-scheduler-1] .j.a.w.c.AbstractWebsocketClientEndpoint : Websocket Error!

java.net.SocketTimeoutException: Connect Timeout
        at org.eclipse.jetty.io.ManagedSelector$Connect.run(ManagedSelector.java:955) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2021-05-05 18:22:17.264 ERROR 28323 --- [t@1169015699-48] .j.a.w.c.AbstractWebsocketClientEndpoint : Could not reconnect!

java.net.SocketTimeoutException: Connect Timeout
        at org.eclipse.jetty.io.ManagedSelector$Connect.run(ManagedSelector.java:955) ~[jetty-io-9.4.38.v20210224.jar!/:9.4.38.v20210224]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

I can see that the method to do that is public, but not the accessor to the client object.

https://github.com/Petersoj/alpaca-java/blob/d4e5ac830767a6b69259192c60645433272435a6/src/main/java/net/jacobpeterson/alpaca/websocket/broker/client/AlpacaWebsocketClient.java#L238

https://github.com/Petersoj/alpaca-java/blob/d4e5ac830767a6b69259192c60645433272435a6/src/main/java/net/jacobpeterson/alpaca/AlpacaAPI.java#L89

Would you mind implementing a simple:

@FunctionalInterface
public interface OnSocketConnected {

    public void onConnected();

}
import javax.websocket.CloseReason; // Not sure as to expose it directly

@FunctionalInterface
public interface OnSocketDisconnected {

    public void onDisconnected(CloseReason reason);

}
public class AlpacaWebsocketClient {

    /* ... */

    public boolean isConnected() {
        return alpacaWebSocketClient.isConnected();
    }

    /* ... */

}

Thanks in advance

Petersoj commented 3 years ago

Yes. I can expose a few methods/fields so that a user can check on the socket state and add listeners to state changes. I'll add that soon.

Petersoj commented 3 years ago

@Caceresenzo Release 7.2 adds WebsocketStateListener to manually listen to Websocket state changes (onConnected, onDisconnected, and onError). Set the WebsocketStateListener to the desired WebsocketClient via AlpacaAPI.setAlpacaStreamWebsocketStateListener() or AlpacaAPI.setMarketDataStreamWebsocketStateListener().