cloudfoundry / cf-java-client

Java Client Library for Cloud Foundry
Apache License 2.0
328 stars 318 forks source link

Recent log: Some messages are not well read #551

Closed antechrestos closed 7 years ago

antechrestos commented 8 years ago

Working with the new version of getRecentLog lead to some not handled messages: These errors are not systematic and are not always the same( sometime expecting VARINT and complaining about not the good length..)

Here is some of these

java.lang.IllegalStateException: Unexpected call to beginMessage()
    at com.squareup.wire.ProtoReader.beginMessage(ProtoReader.java:90) ~[wire-runtime-2.2.0.jar:na]
    at org.cloudfoundry.dropsonde.events.ValueMetric$ProtoAdapter_ValueMetric.decode(ValueMetric.java:189) ~[cloudfoundry-client-2.0.0.M10.jar:2.0.0.M10]
    at org.cloudfoundry.dropsonde.events.ValueMetric$ProtoAdapter_ValueMetric.decode(ValueMetric.java:165) ~[cloudfoundry-client-2.0.0.M10.jar:2.0.0.M10]
    at org.cloudfoundry.dropsonde.events.Envelope$ProtoAdapter_Envelope.decode(Envelope.java:573) ~[cloudfoundry-client-2.0.0.M10.jar:2.0.0.M10]
    at org.cloudfoundry.dropsonde.events.Envelope$ProtoAdapter_Envelope.decode(Envelope.java:499) ~[cloudfoundry-client-2.0.0.M10.jar:2.0.0.M10]
    at com.squareup.wire.ProtoAdapter.decode(ProtoAdapter.java:195) ~[wire-runtime-2.2.0.jar:na]
    at com.squareup.wire.ProtoAdapter.decode(ProtoAdapter.java:189) ~[wire-runtime-2.2.0.jar:na]
    at org.cloudfoundry.reactor.doppler.ReactorDopplerEndpoints.toEnvelope(ReactorDopplerEndpoints.java:66) ~[cloudfoundry-client-reactor-2.0.0.M10.jar:2.0.0.M10]
    at org.cloudfoundry.reactor.doppler.ReactorDopplerEndpoints$$Lambda$38/751608431.apply(Unknown Source) ~[na:na]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber$InnerSubscriber.onNext(MonoFlatMap.java:195) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:125) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:636) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:511) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:994) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onSubscribe(FluxUsing.java:314) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:120) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:360) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:263) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:761) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:176) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onNext(FluxUsing.java:320) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:115) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:390) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.MonoReduce$ReduceSubscriber.onComplete(MonoReduce.java:152) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:775) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:531) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:511) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:504) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:252) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:148) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:228) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.UnicastProcessor.onComplete(UnicastProcessor.java:313) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartParser.onNext(MultipartParser.java:105) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartParser.onNext(MultipartParser.java:43) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.pushToken(MultipartTokenizer.java:261) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.pushDelimiterToken(MultipartTokenizer.java:255) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.trailingCrLf(MultipartTokenizer.java:332) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.doNext(MultipartTokenizer.java:134) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.doNext(MultipartTokenizer.java:30) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:90) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:131) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:122) [reactor-core-3.0.0.RC1.jar:na]
    at reactor.ipc.netty.common.NettyChannelHandler$InboundSink.drain(NettyChannelHandler.java:707) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.common.NettyChannelHandler$InboundSink.next(NettyChannelHandler.java:616) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.common.NettyChannelHandler.doRead(NettyChannelHandler.java:134) [reactor-netty-0.5.0.M2.jar:na]
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:133) [reactor-netty-0.5.0.M2.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1066) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:900) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) [netty-all-4.1.3.Final.jar:4.1.3.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

Available for any more information.

cf-gitbot commented 8 years ago

We have created an issue in Pivotal Tracker to manage this:

https://www.pivotaltracker.com/story/show/127990109

The labels on this github issue will be updated when the story is started.

antechrestos commented 8 years ago

This time with 2.0.0.RELEASE, I made the a program that does the following:

Here are the different errors I got

antechrestos commented 8 years ago

I reproduced the same behaviour with the https://api.run.pivotal.io (API version: 2.59.0) instance and the 2.0.0.RELEASE

antechrestos commented 8 years ago

@nebhale Working on my own branch: I implemented an EnvelopeHandler that is instanciated on each call to recentLogs. This handler proxify the inputStream and keep track of all the multipart read. When a problem is encountered, it throws a ReadEnvelopeError that contains all the multipart read.

To get the error, we can launch a program such as this one

By letting it run on my machine I got the following errors

Looking at the files we can see that must have an issue in the multipart reader as the last part (the one that is not read) is either empty, or sometime contains a part of the boundary (see java.net.ProtocolException-1472468264265.bin for example).

Also, there is this error that appears (more rarely, yet inside the multipart again):

reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.bubble(Exceptions.java:96)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:317)
    at reactor.core.publisher.FluxDrop$DropSubscriber.onNext(FluxDrop.java:119)
    at reactor.ipc.netty.http.multipart.MultipartParser.onNext(MultipartParser.java:114)
    at reactor.ipc.netty.http.multipart.MultipartParser.onNext(MultipartParser.java:43)
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.pushToken(MultipartTokenizer.java:326)
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.pushDelimiterToken(MultipartTokenizer.java:320)
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.trailingCrLf(MultipartTokenizer.java:397)
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.onNext(MultipartTokenizer.java:191)
    at reactor.ipc.netty.http.multipart.MultipartTokenizer.onNext(MultipartTokenizer.java:33)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:126)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:121)
    at reactor.ipc.netty.common.NettyChannelHandler$InboundSink.drain(NettyChannelHandler.java:707)
    at reactor.ipc.netty.common.NettyChannelHandler$InboundSink.next(NettyChannelHandler.java:616)
    at reactor.ipc.netty.common.NettyChannelHandler.doRead(NettyChannelHandler.java:134)
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:133)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1066)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:900)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.handler.proxy.ProxyHandler.channelRead(ProxyHandler.java:253)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:252)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:121)
    at reactor.core.publisher.Flux.lambda$onBackpressureError$53(Flux.java:3497)
    at reactor.core.publisher.Flux$$Lambda$98/140469282.accept(Unknown Source)
    at reactor.core.publisher.FluxDrop$DropSubscriber.onNext(FluxDrop.java:117)
    ... 56 common frames omitted
nebhale commented 8 years ago

/cc @smaldini

gberche-orange commented 7 years ago

@smaldini @nebhale

The problem still reproduces with the latest released version of cf-java-client 2.0.2.RELEASE, as well as with updated dependencies netty 4.1.6.Final and okhttp 3.4.2.

The problem reproduces more frequently with large recent logs buffers: with 300 entries in the buffer, it reproduces close to 100% (always fails even after 100 retries).

I wonder how we can progress on this issue. I'm starting looking into the possible root cause suggested by @antechrestos of a non deterministic issue in the multipart decoder.

As I did not find unit tests for the multipart code in reactor-netty, I am considering capturing the bytearray from a recentLog endpoint with 300 log entries, and try to reproduce the problem in an "offline" unit or integration test.

Any other suggestion or guidance you could bring on this ?

nebhale commented 7 years ago

We've got some serious connection/network changes (including a multipart overhaul) brewing in reactor-netty at the moment which I think might address your changes. If you can at the very least get us a network stream that reliably reproduces the issues, I'll make sure that it gets into a test suite somewhere to reproduce and diagnose the issue definitively.

gberche-orange commented 7 years ago

thanks @nebhale

Would a cf-java-client PR with wiremock recordings and possibly an associated unit test exercising recentLogs be approriate/useful for you to reproduce and diagnose the issue ?

I have this almost ready (I'm still struggling on the token endpoint wiremock), would need to polish and clean to submit the PR.

nebhale commented 7 years ago

Yeah, that'll definitely help but if you'd like just getting the wiremock recordings would be sufficient. We can do the rest.

gberche-orange commented 7 years ago

Here is a wiremock recording obtained from PWS recentLogs endpoints ([running cf v246](https://status.run.pivotal.io/, with ~1200 access logs entries from a simple static buildpack app.).

wiremock-issue-51.tar.gz

In this specific example, the protocolbuffer parsing deterministically fails at the 197th envelope, see more details in issue-51-pws-stacktrace.txt

In some other cases detailed in https://github.com/cloudfoundry-community/autosleep/issues/252#issuecomment-263351909 the multipart parsing seems to be non deterministic and trigger different protocolbuffer envelope parsing exceptions from the same recorded wire when executed in a retry loop.

Hope this helps nail down the issue. Please let me know if I can help in some other way.

gberche-orange commented 7 years ago

@nebhale would you have an ETA for the network changes you mentioned in https://github.com/cloudfoundry/cf-java-client/issues/551#issuecomment-263581917 or pointer to associated stories/GH issues ?

nebhale commented 7 years ago

Working hard on it pretty much every day. I'm actually working on the multipart log message reception right at this moment. I believe we're aiming for no later than Dec 15 to meet another deadline, but we'll see.

gberche-orange commented 7 years ago

thanks for the update Ben.

nebhale commented 7 years ago

@gberche-orange and @antechrestos I'm looking at this today, and something strange is going on. Given the payload attached above, one of the messages isn't parsable, resulting in:

java.net.ProtocolException: Unexpected end group
    at com.squareup.wire.ProtoReader.nextTag(ProtoReader.java:147)
    at org.cloudfoundry.dropsonde.events.Envelope$ProtoAdapter_Envelope.decode(Envelope.java:512)
    at org.cloudfoundry.dropsonde.events.Envelope$ProtoAdapter_Envelope.decode(Envelope.java:463)
    at com.squareup.wire.ProtoAdapter.decode(ProtoAdapter.java:195)
    at com.squareup.wire.ProtoAdapter.decode(ProtoAdapter.java:177)
    at org.cloudfoundry.reactor.doppler.ReactorDopplerEndpoints.toEnvelope(ReactorDopplerEndpoints.java:83)

I'm at a loss as to why this might be true.

nebhale commented 7 years ago

Got it!

antechrestos commented 7 years ago

@nebhale awesome!!! Looking forward to get the next release :smirk: @gberche-orange I will integrate this next version.

antechrestos commented 7 years ago

@nebhale thank you very much for it. Thank you!!

nebhale commented 7 years ago

@antechrestos If you can give your stuff a go with this latest snapshot before with release, I'd appreciate it. I'm pretty certain that we've solved the issue, but I'd like you to confirm that for me if you can.

shawn-nielsen commented 7 years ago

@nebhale We had experienced the same issues noted above, and the latest snapshot seems to have solved our issue. Thank you so much!

gberche-orange commented 7 years ago

@nebhale I unfortunately still reproduce the problem, hopefully with the latest versions, see traces which include version details: cf-java-client-traces.txt

Do you need other stream captures to diagnose and reproduce ?

cf-gitbot commented 7 years ago

We have created an issue in Pivotal Tracker to manage this:

https://www.pivotaltracker.com/story/show/136288663

The labels on this github issue will be updated when the story is started.

nebhale commented 7 years ago

Yes please.

gberche-orange commented 7 years ago

Here are wiremock verbose traces along with unit tests output. I understand it shows that the error is not systematic when parsing the same recorded stream. cf-java-client-traces-with-wiremock.txt wiremock-stream-files.tar.gz

If someone needs to capture more feels, I used the following procedure to set up wiremock: https://github.com/cloudfoundry-community/autosleep/issues/252#issuecomment-263351909

antechrestos commented 7 years ago

Hi sorry for the late reply, I have been busy. I don't know if that may help but I cleaned the dust of my old recent log integration test and rebased it upon the new code.

Basically, it tries to read recent logs of a mere static application. Every 10 iteration, it switches application state... Quite trivial.

What is surprising, is that it goes well until the 9-th iteration; From this moment, it fails every time without any exception sent back to the subscriber. If I restart the process it will go fine until 9-th iteration.

I tried to put some breakpoint in ReactorDopplerEndpoints.recentLogs method: it does not enter .flatMap(inbound -> inbound.addHandler(new MultipartDecoderChannelHandler(inbound)).receiveObject())nor .map(ReactorDopplerEndpoints::toEnvelope)... I don't know if I implemented the call code as it should be:

            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicLong count = new AtomicLong();
            final AtomicReference<Throwable> errorReference = new AtomicReference<>();
            dopplerClient.recentLogs(RecentLogsRequest.builder()
                .applicationId(applicationId)
                .build())
                .subscribe(envelope -> {
                        logEnvelope(envelope);
                        count.incrementAndGet();
                    },
                    throwable -> {
                        errorReference.set(throwable);
                        latch.countDown();
                    },
                    latch::countDown);
            log.debug("{} - getRemoteLogs - waiting", number);
            if (!latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)) {
                throw new IllegalStateException("Subscriber timed out");
            } else if (errorReference.get() != null) {
                throw errorReference.get();
            } else {
                log.debug("{} - getRemoteLogs - got {} envelope(s)", number, count.get());
            }
nebhale commented 7 years ago

@gberche-orange and @antechrestos: Found the stupid and unnecessary optimization (Sure, let's just reset the multipart state in the middle of receiving. I'm sure that won't be an issue at all.) that was causing this issue. Have at again, and sorry it's taking so much to get this sorted once and for all.

antechrestos commented 7 years ago

@nebhale I will give a try tomorrow and will tell you. No problem for the time spent. This does not seem to be an easy one. Let's hope we break its neck :smirk:

gberche-orange commented 7 years ago

thanks @nebhale for this fix and allocating time to tackle this issue in busy times !

I confirm the test program is now able to read 6000k+ recent log entries without parsing errors.

Maybe an integration tests in cf-java-client could protect against future regressions. Here is a possible scenario:

Thanks again,

Guillaume.

antechrestos commented 7 years ago

@nebhale Still have this issue as explained above; by looping on getRecentLogs and sleeping between calls, I get stuck on the 9th call. I suspect a connection pool that get empty. Is it possible? Or did I miss anything in the above code?

gberche-orange commented 7 years ago

@nebhale We are able to reproduce that the cf-java-client misbehaves when the recent log buffer is small or empty: the DopplerClient seems to enter a corrupted state and all subsequent clients calls fail systematically with a subscriber timeout. This corrupted state does not recover as more recent logs get served subsequently.

We suspect an off-by-one/unhandled error condition bug in cf-java-client which breaks the reactive stream. To reproduce once more with debug traces, we need to wait for the recent logs to expire from the doppler endpoint (as pushing a fresh application will always return the staging logs). We'll then post associated debug traces (any suggestion on the right level to enable ?)

Also, we wonder whether the HTTP connection pooling exhibits some lazy cleanup behaviors of underlying TCP connections:

Looking at the opened HTTP connections (netstat -an | grep api-ip-adress ) , we observe unsystematic behaviors:

Enabling debug traces for reactor.ipc.netty we observe that the connection pool seems to be of size 1.

2016-12-20 11:57:49,653 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Acquired [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 1 active connections 2016-12-20 11:57:49,727 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Released [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 0 active connections 2016-12-20 11:57:50,731 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Acquired [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 1 active connections 2016-12-20 11:57:50,794 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Released [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 0 active connections 2016-12-20 11:57:51,795 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Acquired [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 1 active connections 2016-12-20 11:57:51,867 reactor-http-nio-1 reactor.ipc.netty.resources.DefaultPoolResources Released [id: 0x2423d223, L:/10.0.2.15:33784 - R:api.nd-cfapi.itn.ftgroup/10.117.41.250:443], now 0 active connections

/CC @antechrestos

smaldini commented 7 years ago

I'm working on a similar issue directly in reactor-netty, I'll see with @nebhale if we can sync to deploy the netty fix.

It might stall because reactor-netty simply missed an instruction to read or ignored one. As the Reactive Streams principle stands, all traffic is not application responsibility if not requested, and not reading the connection can lead to hard hanging difficult to diagnose. That's the specific part I want to isolate a maximum. I'm investigating an issue around pool releasing but I believe CF uses the "elastic" strategy. Elastic means that there is no limit to connections in the pool but there is a throttling mechanism before too many connections are open and more importantly if a connection just has been previously released by another operation it will "in a best effort" try to re-use it. Sometimes pool releasing after server close can also be done without notifying the underlying client flow too and that would cause a hang up. Anyway this is the specific issue I'm fighting with right now. Websocket is interesting, it doesn't allow connection pooling in standard HTTP. So we have to close the websocket connections regardless of them being part of a pool, which himself will once upgraded (it cannot be downgraded back to HTTP and be reused by other operations)lazily renew itself on next operations.

@nebhale It might be worth trying to disable pool for websocket operations since the only use they can make from it is to upgrade a pooled connection and never really release it (close). It might be better to isolate them into dedicated connections and the cost of connecting for reactor-netty is relatively transparent if the rest of the flow is reactive-streams based, since you will receive things when they are ready anyway.

smaldini commented 7 years ago

Some context and extra note, as @nebhale said we are about to release 0.6 because of our internal changes and Spring 5 M4 release also planned circa the next days. We have allocated some priority time for january to deal with eventual patch releases in case you are still facing this after the 0.6.

Note that in parallel we are testing with a load of setups including the Spring web test suite and CF itself (http://projectreactor.io, our site is just a one-class reactor app using the server AND the client to proxy our javadoc repo https://github.com/reactor/projectreactor.io/blob/master/src/main/java/io/projectreactor/Application.java). The pooling became an issue for the site for instance as by our remote server doc we proxy tends to close aggressively connections, that's where I noticed that connections were not properly released and Websocket might suffer the same since server can close after the last websocket frame.

antechrestos commented 7 years ago

@nebhale @smaldini I made a new test with an application without any activity (see log.txt )

From what I understand, the pool does not create a new one when it reaches 8... It sees old websockets as opened when they may have been closed?

/CC @gberche-orange

nebhale commented 7 years ago

Actually the pool size is fixed and calculated by Reactor. I believe it's the number of cores on your machine (something tells me your workstations are quad core, hyper threaded making the value 8). I believe that @antechrestos is right that the connections are just not being closed, and exhausting the pool. How do you think you're closing those connections?

nebhale commented 7 years ago

@gberche-orange and @antechrestos there are new snapshots of both reactor-netty and the client that improve the closing of WS connections. Specifically, the connections still count agains the pool size (which defaults to the number of cores on a system, but can be configured in the ConnectionContext). However, you can now force those connections to close by using the Cancellation.dispose() method. So if you did dopplerClient.stream(...).dispose() it should open and close the connection immediately. In your testing, you'll need to ensure that method is called when you want to close the WS connection, as well as ensuring that you've got a large enough connection pool to handle all of the connections you might need to have open in parallel.

smaldini commented 7 years ago

I might add that any operation cancelling prematurely can help closing e.g.

//sized take
dopplerClient.stream(...).take(4).subscribe(doSomething);

//timed take
dopplerClient.stream(...).takeUntil(Duration.ofSeconds(10)).subscribe(doSomething);

//manual reference take
DirectProcessor<Void> p = DirectProcessor.create();
dopplerClient.stream(...).takeUntilOther(processor).subscribe(doSomething);
//...
processor.onComplete();

Note that Cancellation is scheduled for naming review for Reactor Core 3.1 (Q1 2017), we do not forecast a lot of issues around most of the API but we got feedbacks and request to align with what Spring does (DisposableBean) or Rx (Disposable) and call it Disposable.

antechrestos commented 7 years ago

@nebhale @smaldini So far,so good: I have been getting recents logs 185 times without any envelope and no time out. I will go on with alternating start, stop and let it run for a long period... :crossed_fingers: /CC @gberche-orange

antechrestos commented 7 years ago

@nebhale @smaldini I have just ended testing it. It runs without a charm. The test ends while the token gets invalid. I though it might refresh automatically , I must check the documentation and will submit an issue if so.

See the cf-java-client.log.txt. As you see, it goes well for a time (5 seconds before each call to recent logs, one stop/start before each ten call).

At the end we have two subscriber timeout (I suspect that an error is not well propagated) and then the token is invalid.

Thank you again, I hope this problem is behind us. :santa:

/CC @gberche-orange

nebhale commented 7 years ago

Sounds good. We know that there's a problem with the error propagation in token negotiation. It's one of the first things we're going to tackle in 2.2.0.

gberche-orange commented 7 years ago

thanks @nebhale and @smaldini for your fixes (and sorry for late feedback during holiday period). This indeed greatly improves the situation: my test program is able to up to 15k recent logs entries, but I still observe timeouts.

2017-01-02 12:33:48,330 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest got 14661 envelopes last_ts=2017-01-02T11:28:18.478093810Z elapsed=299022ns ns_per_enveloppe=20 
2017-01-02 12:33:50,403 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest got 14661 envelopes last_ts=2017-01-02T11:28:18.478093810Z elapsed=538640ns ns_per_enveloppe=36 

So if you did dopplerClient.stream(...).dispose() it should open and close the connection immediately.

I added a call to dispose() to the following call (see full method in the gist)

            subscribedLogs = dopplerClient.recentLogs(RecentLogsRequest.builder()
                .applicationId(applicationId)
                .build())
                .subscribe(envelope -> {
                        logEnvelope(envelope);
                        count.incrementAndGet();
[...]
                    },
                    throwable -> {
                        errorReference.set(throwable);
                        latch.countDown();
                    },
                    latch::countDown);
[..]
          latch.await(TIMEOUT.getSeconds() ...)
           [...]
          subscribedLogs.dispose();

I then see the established TCP connections in this test varying from 5 to 1 cnx.

I however still sometimes seldomly reproduce that stream times out (after 30s, and 300s in another test) and the doppler client from now on systematically times out and does not recover

2017-01-02 12:36:48,949 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest got 14661 envelopes last_ts=2017-01-02T11:28:18.478093810Z elapsed=366185ns ns_per_enveloppe=24 
2017-01-02 12:36:51,163 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest got 14661 envelopes last_ts=2017-01-02T11:28:18.478093810Z elapsed=375168ns ns_per_enveloppe=25 
2017-01-02 12:36:53,101 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest got 14661 envelopes last_ts=2017-01-02T11:28:18.478093810Z elapsed=8657043ns ns_per_enveloppe=590 
2017-01-02 12:37:24,125 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest Un-managed error java.lang.IllegalStateException: Subscriber timed out
    at org.cloudfoundry.reactor.doppler.RecentLogsTest.fetchLogsOrReturnError(RecentLogsTest.java:167) ~[test-classes/:na]
    at org.cloudfoundry.reactor.doppler.RecentLogsTest.getRemoteLogs(RecentLogsTest.java:121) ~[test-classes/:na]
    at org.cloudfoundry.reactor.doppler.RecentLogsTest.reproduce_recent_log_issue(RecentLogsTest.java:94) 
...
017-01-02 12:42:03,634 main                 org.cloudfoundry.reactor.doppler.RecentLogsTest Un-managed error java.lang.IllegalStateException: Subscriber timed out
...

It seems that this "deadock" situation is more frequent when the cf-java-client suffers from CPU starvation (i.e. I artifically load my test machine with an unrelated ab on a distinct app).

From https://github.com/cloudfoundry/cf-java-client/issues/551#issuecomment-268228566

It might stall because reactor-netty simply missed an instruction to read or ignored one. As the Reactive Streams principle stands, all traffic is not application responsibility if not requested, and not reading the connection can lead to hard hanging difficult to diagnose. That's the specific part I want to isolate a maximum.

I'm not sure if additional traces can help the diagnostic. Here are reactor-netty debug traces during the problem https://gist.github.com/gberche-orange/aa1e73b9a55274619ed2ccc8d06d0cd6 edited with linefeeds around the occurence of the timeout.

From https://github.com/cloudfoundry/cf-java-client/issues/551#issuecomment-268541626

I might add that any operation cancelling prematurely can help closing

@smaldini I understand the cf-java-client-code at https://github.com/cloudfoundry/cf-java-client/blob/69b91b1c4700c7051aa647b496a56579c12faf41/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/doppler/ReactorDopplerEndpoints.java#L62 is currently using the .takeWhile(t -> MultipartDecoderChannelHandler.CLOSE_DELIMITER != t) syntax.

Do you mean the future cancellation strategy would be performed by the callers of the cf-java-client upon receiving a timeout ?

gberche-orange commented 7 years ago

@nebhale should I submit a new issue for the remaining suspected race condition, or would you prefer to re-open this issue ?

nebhale commented 7 years ago

@gberche-orange Unfortunately we've been dealing with some other issues last week. I hope to take another look at it this week.

cf-gitbot commented 7 years ago

We have created an issue in Pivotal Tracker to manage this:

https://www.pivotaltracker.com/story/show/137789303

The labels on this github issue will be updated when the story is started.

gberche-orange commented 7 years ago

thanks @nebhale

nebhale commented 7 years ago

@gberche-orange Actually thinking about it, can you please open another issue with this info in it. This has already be closed out for another release, so let's get a fresh bit of tracking with the new one.

nebhale commented 7 years ago

Thanks!