msemys / esjc

EventStore Java Client
MIT License
108 stars 27 forks source link

Persistent subscription - Connection Closed #71

Open jselamy opened 3 years ago

jselamy commented 3 years ago

We are using this library in production.

We have an event handler that dispatches to downstream apps subscribed to a stream. We have a stream in particular that has a huge load of incoming event (IoT)

After a long period of time, for some reasons looks like we're dealing with an issue whereas it's not responding anymore any idea why ? tmp_1618855140738

We are facing this issue for quitte a time now.

jselamy commented 3 years ago

Any news ?

This is the error we get after a period of time on an active Persistent Subscription streaming events constantly

com.github.msemys.esjc.ConnectionClosedException: Connection was closed.
    at com.github.msemys.esjc.subscription.AbstractSubscriptionOperation.connectionClosed(AbstractSubscriptionOperation.java:219) ~[esjc-2.3.0.jar:2.3.0]
    at com.github.msemys.esjc.subscription.manager.SubscriptionManager.lambda$purgeSubscribedAndDropped$2(SubscriptionManager.java:55) ~[esjc-2.3.0.jar:2.3.0]
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[na:na]
    at java.base/java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3612) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[na:na]
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[na:na]
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[na:na]
    at com.github.msemys.esjc.subscription.manager.SubscriptionManager.purgeSubscribedAndDropped(SubscriptionManager.java:54) ~[esjc-2.3.0.jar:2.3.0]
    at com.github.msemys.esjc.EventStoreTcp.onTcpConnectionClosed(EventStoreTcp.java:818) ~[esjc-2.3.0.jar:2.3.0]
    at com.github.msemys.esjc.EventStoreTcp.lambda$null$12(EventStoreTcp.java:870) ~[esjc-2.3.0.jar:2.3.0]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1350) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:71) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:71) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.ChannelDuplexHandler.close(ChannelDuplexHandler.java:73) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:624) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:608) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:465) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at com.github.msemys.esjc.tcp.handler.HeartbeatHandler.lambda$userEventTriggered$0(HeartbeatHandler.java:61) ~[esjc-2.3.0.jar:2.3.0]
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495) ~[netty-transport-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.33.Final.jar:4.1.33.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
msemys commented 3 years ago

hi, looks like tcp connection was dropped due es server was not responded to ping, that esjc client was sent. if es server is on heavy load it is possible that responses from server side lags.

try to increase esjc heartbeat interval/timeout e.g.:

EventStoreBuilder.newBuilder()
    .heartbeatInterval(Duration.ofSeconds(2))
    .heartbeatTimeout(Duration.ofSeconds(5))
    .build();
jselamy commented 3 years ago

Ok thanks,

Since there's an automatic reconnection set to -1 the client reconnect to the ES.

Perhaps in this same app there's connections to Persistent Subscription and it's not re-subscribing to Persistent Subscriptions. We've added a little mechanism by putting a listener to EventStore instance

es.addListener(event -> {
            if(event instanceof ClientConnected) {
                if (log.isDebugEnabled()) {
                    log.debug("Received EventStore Event: {}", event.getClass().getSimpleName());
                }

                if (log.isInfoEnabled()) {
                    log.info("ClientConnected event received from TCP client sending a ApplicationEvent to notify");
                }

                publisher.publishEvent(new ESClientConnectedEvent(this));
            }
        })
@EventListener
    public void onApplicationEvent(ESClientConnectedEvent event) {
        if(event.isInitialized()) {
            final Map<String, PersistentSubscriber> subscribers = ctx.getBeansOfType(PersistentSubscriber.class);

            subscribers
                    .values()
                    .stream()
                    .peek(ps -> {
                        if(log.isInfoEnabled()) {
                            log.info("Performing a subscribe for stream {} with subscription_name {}", ps.getStream(), ps.getGroupName());
                        }
                    })
                    .forEach(PersistentSubscriber::subscribe);
        }
    }

Would there be a better way to do such a thing ?