googleapis / google-cloud-java

Google Cloud Client Library for Java
https://cloud.google.com/java/docs/reference
Apache License 2.0
1.89k stars 1.06k forks source link

Pub Sub messages failed due to due to DEADLINE_EXCEEDED: #3867

Closed ayushj158 closed 5 years ago

ayushj158 commented 5 years ago

Thanks for stopping by to let us know something could be better!

Please include as much information as possible:

Environment details

Steps to reproduce

  1. Publishing 20m records which post messages on pub sub continuously

Stacktrace

o.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: . grpc-default-worker-ELG-2-2]` 7267 --- INFO i.g.n.s.i.n.h.c.h.DefaultHttp2ConnectionDecoder - [id: 0x204dac20, L:/10.8.61.36:58898 - R:pubsub.googleapis.com/172.217.20.74:443] ignoring HEADERS frame for stream RST_STREAM sent. {} ``

Code snippet

com.google.cloud.pubsub.v1.Publisher.publish(com.google.pubsub.v1.PubsubMessage;)

Any additional information below

We are publishing some bulk messages to pub sub and intermittently we keep seeing few messages getting failed due to :

I checked one the previous open issues googleapis/google-cloud-java#2722 which states the issues is resolved. I am not sure what is the resolution ?

just upgrading to the higher versions will fix the issue or we need to add some custom timeout settings (if yes where , RetrySettings?) @pongad @kir-titievsky

Following these steps will guarantee the quickest resolution possible.

Thanks!

pongad commented 5 years ago

Publisher itself doesn't have any flow control. If you publish too many messages at once, the network eventually becomes congested and some of the messages won't be delivered. Eventually gRPC has to give up on the message, otherwise the messages will build up and crash your machine.

Could you tell us what your application should do in this circumstances? Knowing that will make it easier to make recommendations; I think some form of load shedding is still required generally though.

ayushj158 commented 5 years ago

@pongad Our application is trying to dump millions of data objects to other system as part of migration. We use pubsub to publish all such data objects . If the application crashes the only thing we can do is to restart the dump , because the data migrated is very important and not even a single record of this data can be missed , considering the amount of data (over 50m ) it becomes impossible to read delta from database (as it becomes really slow with where clauses) , thus only option is to restart full dump . !

But good news is meanwhile i tried some new approach to put some brakes to messages being posted to pub sub and give it some breathing space. Basically we maintain a count which is difference of messages posted and messages delivered ( success or failure). Whenever that count is greater than 800 we sleep all incoming threads(for 12 seconds as RPC total timeout is 10 seconds) to allow pub sub to clear its outstanding messages. As of now i have processed around 30M records and it performed well with 0 failures vs earlier (10% of messages failed).

Can something similar be added to pub sub Out of the box which recognizes when pub sub is overloaded and gives it some breathing space until outstanding messages clear ?

pongad commented 5 years ago

This makes sense. I think we added something like this in the beginning but then found that different people want to different behavior. If you have a batch job you want to slow down. If you have something interactive it might make sense to error etc. I think it might be better to add some kind of adapter on top like

Publisher pub = Publisher.newBuilder().....
BlockingPublisher blockPub = new BlockingPublisher(pub, 1000); // at most 1000 in flight at once.

for (int i = 0; i < 1000; i++) {
  blockPub.publish(someMessage); // these don't block.
}
blockPub.publish(anotherMessage); // blocks until one of the above finishes.

@JesseLovelace @chingor13 WDYT? We should get pubsub team to sign off before actually doing this.

sholavanalli commented 5 years ago

I am seeing this error as well. @pongad @ayushj158 can you both explain what exactly is going on here? I don't understand the finer details of the bug by reading your comments above. Can one of you elaborate on this please? Where and why is the DEADLINE_EXCEEDED exception generated? Is it generated at the pubsub client before sending the messages or is it at the pubsub server? I have all the flow control settings configured in my pubsub client app. Here is how it looks:

messageCountThreshold: 1000
requestKiloBytesThreshold: 64000 
publishDelayInSeconds: 5

I see DEADLINE_EXCEEDED error in the logs at random intervals. I am not ingesting too much traffic either. It is less than 1MB per second at the moment. The version of the client i am using is here:

group: 'com.google.cloud', name: 'google-cloud-pubsub', version: '1.40.0'

Here is the complete exception stacktrace:

2018-11-28 20:19:47 [pool-1-thread-16] ERROR c.d.s.pubsub.PubsubMessageDispatcher - Failed to publish to pubsub. Status code: 504, is retryable: TRUE
com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded: -50340118066 ns from now
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
    at com.google.common.util.concurrent.Futures$4.run(Futures.java:1123)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:435)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:900)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:811)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:675)
    at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:493)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:468)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:684)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:403)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded: -50340118066 ns from now
    at io.grpc.Status.asRuntimeException(Status.java:526)
    ... 23 common frames omitted

Appreciate you help. Thanks.

pongad commented 5 years ago

@sholavanalli where are you setting those parameters? I haven't worked on this code for a while but I don't think Publisher support these configurations.

Currently Publisher batches multiple calls to publish into one RPC to send to server for efficiency, but it will not rate-limit the publishes. If your network connection or the pubsub backend can only handle 100K messages per second (for example) and you try to publish 200K per second, eventually some messages will timeout.

ayushj158 commented 5 years ago

@sholavanalli As pongad mentioned i am also not aware of such arguments being respected by Pub Sub client and as for your question, it does fails at pub sub client because of the timeout (which occurs due to the load you put on pub sub client vs what is it able to send to pub sub server) , basically you are overloading the pub sub client with more load than it could handle. For our case we used load shedding to give some breathing space to pub sub as soon as we identify it is getting clogged,i have tired load shedding to send more than 130m records and it worked seamlessly without a single failure but it does degrade the performance a little .

If you need the solution until GCP comes with the OOB solution , i can help you there.

sholavanalli commented 5 years ago

@pongad I set those parameters to the flow control settings that is set on the pubsub client. The client batches messages to pubsub when one of the 3 thresholds is reached. @ayushj158 I like your idea of load shedding. We are also looking to setup some monitoring around these errors. Thanks for your help.

ajaaym commented 5 years ago

This needs throttling and it will be available once we implement new batcher with flow control. Closing this here and we will track this in #3003