msemys / esjc

EventStore Java Client
MIT License
108 stars 27 forks source link

Subscription appears to die silently after OOM #13

Closed SoftMemes closed 7 years ago

SoftMemes commented 7 years ago

We have some fairly large events in EventStore and subscribe with a catch-up subscripiton. It appears that under some circumstances, this causes the subscription to die silently due to OOM - we are no longer receiving messages on the subscription, but are also not informed in any way of the fact that the subscription dies. The connection to eventstore is still live.

Stack traces from our logs:

j.l.OutOfMemoryError: Java heap space
        at c.g.p.ByteString.toByteArray(ByteString.java:532)
        at c.g.m.e.RecordedEvent.<init>(RecordedEvent.java:71)
        at c.g.m.e.ResolvedEvent.<init>(ResolvedEvent.java:44)
        at c.g.m.e.StreamEventsSlice$$Lambda$99/1681539066.apply(Unknown Source)
        at j.u.s.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at j.u.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
        at j.u.s.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at j.u.s.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at j.u.s.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at j.u.s.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at j.u.s.ReferencePipeline.collect(ReferencePipeline.java:499)
        at c.g.m.e.StreamEventsSlice.<init>(StreamEventsSlice.java:73)
        at c.g.m.e.o.ReadStreamEventsForwardOperation.transformResponseMessage(ReadStreamEventsForwardOperation.java:103)
        at c.g.m.e.o.ReadStreamEventsForwardOperation.transformResponseMessage(ReadStreamEventsForwardOperation.java:17)
        at c.g.m.e.o.AbstractOperation.succeed(AbstractOperation.java:168)
        at c.g.m.e.o.ReadStreamEventsForwardOperation.inspectResponseMessage(ReadStreamEventsForwardOperation.java:60)
        at c.g.m.e.o.ReadStreamEventsForwardOperation.inspectResponseMessage(ReadStreamEventsForwardOperation.java:17)
        at c.g.m.e.o.AbstractOperation.inspect(AbstractOperation.java:72)
        at c.g.m.e.t.h.OperationHandler.channelRead0(OperationHandler.java:47)
        at c.g.m.e.t.h.OperationHandler.channelRead0(OperationHandler.java:19)
        at i.n.c.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
        at c.g.m.e.t.h.HeartbeatHandler.channelRead0(HeartbeatHandler.java:40)
        at c.g.m.e.t.h.HeartbeatHandler.channelRead0(H...
j.l.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3520)
        at c.g.m.e.t.TcpPackage.of(TcpPackage.java:101)
        at c.g.m.e.t.TcpPackageDecoder.decode(TcpPackageDecoder.java:23)
        at c.g.m.e.t.TcpPackageDecoder.decode(TcpPackageDecoder.java:9)
        at i.n.h.c.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
        at i.n.h.c.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        at i.n.h.c.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
        at i.n.c.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at i.n.c.n.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at i.n.c.n.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
        at i.n.c.n.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
        at i.n.c.n.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
        at i.n.c.n.NioEventLoop.run(NioEventLoop.java:399)
        at i.n.u.c.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at i.n.u.c.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
j.l.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3520)
        at c.g.m.e.t.TcpPackage.of(TcpPackage.java:101)
        at c.g.m.e.t.TcpPackageDecoder.decode(TcpPackageDecoder.java:23)
        at c.g.m.e.t.TcpPackageDecoder.decode(TcpPackageDecoder.java:9)
        at i.n.h.c.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
        at i.n.h.c.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        at i.n.h.c.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:345)
        at i.n.c.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:366)
        at i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:352)
        at i.n.c.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at i.n.c.n.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at i.n.c.n.NioEventLoop.processSelectedKey(NioEventLoop.java:572)
        at i.n.c.n.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:513)
        at i.n.c.n.NioEventLoop.processSelectedKeys(NioEventLoop.java:427)
        at i.n.c.n.NioEventLoop.run(NioEventLoop.java:399)
        at i.n.u.c.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at i.n.u.c.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
SoftMemes commented 7 years ago

The error comes from Netty:

[esio-1-1] - i.n.c.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception."

msemys commented 7 years ago

If your jvm heap size is too small, try:

If you want to be informed when the subscription is closed - you should implement onClose(...) listener method e.g.:

eventstore.subscribeToAllFrom(Position.START, new CatchUpSubscriptionListener() {
    @Override
    public void onEvent(CatchUpSubscription subscription, ResolvedEvent event) {
        System.out.println("onEvent: " + event);
    }

    @Override
    public void onClose(CatchUpSubscription subscription, SubscriptionDropReason reason, Exception exception) {
        System.out.println("onClose: reason=" + reason + "; exception=" + exception);
    }
});

...anyway, there is small issue here with client behavior when error occurs before tcp package handling. In such cases as this, the .net client closes tcp connection: https://github.com/EventStore/EventStore/blob/release-v4.0.0/src/EventStore.ClientAPI/Transport.Tcp/TcpPackageConnection.cs#L123

SoftMemes commented 7 years ago

Thanks for the reply. We've addressed root cause with the methods you describe (buffer size and heap), but the problem was that the client did not call the onClose handler - which meant our automatic restarts, monitoring etc did not kick in. Will the fix you referenced address this, or does it relate to something else?

msemys commented 7 years ago

Yes. If there will be an error while processing tcp package, the client will close all subscriptions (as a result subscription listener onClose() will be called) and disconnect.