googleapis / nodejs-pubsub

Node.js client for Google Cloud Pub/Sub: Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics.
https://cloud.google.com/pubsub/
Apache License 2.0
519 stars 227 forks source link

pubsub-emulator throw error when publish 50k messages. #992

Closed mrdulin closed 4 years ago

mrdulin commented 4 years ago

Environment details

Description

When I tried to publish 50k messages, the pubsub-emulator throw below error infinitely:

Error stack

```bash [pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError [pubsub] 警告: Stream Error [pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place [pubsub] at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415) [pubsub] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540) [pubsub] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529) [pubsub] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101) [pubsub] at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [pubsub] at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703) [pubsub] at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) [pubsub] at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) [pubsub] at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428) [pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939) [pubsub] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) [pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906) [pubsub] at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978) [pubsub] at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253) [pubsub] at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118) [pubsub] at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32) [pubsub] at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44) [pubsub] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [pubsub] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495) [pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) [pubsub] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [pubsub] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [pubsub] at java.lang.Thread.run(Thread.java:748) [pubsub] [pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError [pubsub] 警告: Stream Error [pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place [pubsub] at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512) [pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476) [pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415) [pubsub] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540) [pubsub] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529) [pubsub] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101) [pubsub] at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [pubsub] at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703) [pubsub] at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) [pubsub] at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) [pubsub] at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428) [pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939) [pubsub] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) [pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906) [pubsub] at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731) [pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717) [pubsub] at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978) [pubsub] at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253) [pubsub] at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118) [pubsub] at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32) [pubsub] at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44) [pubsub] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [pubsub] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495) [pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) [pubsub] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [pubsub] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [pubsub] at java.lang.Thread.run(Thread.java:748) [pubsub] ```

My publisher also throws many same errors:

{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }
{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }

My subscriber throw below errors:

[2020-05-06T06:29:08.932Z]Received message 46689:
Data: message payload 3998
Attributes: {}
[2020-05-06T06:29:08.932Z]Received message 46690:
Data: message payload 3999
Attributes: {}
ERROR: Error: Failed to "acknowledge" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 400 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to connect to channel. Reason: Failed to connect before the deadline

Actually, I don't know it's a bug or I am using pubsub incorrectly. So I decide to post this as a github issue. I found there are some questions on stackoverflow and issues on github:

But didn't find a solution.

Steps to reproduce

I made a minimal code example for reproducing it. Please tell what's going on? Thanks!

Repo: https://github.com/mrdulin/nodejs-gcp/tree/master/src/pubsub/pubsub-emulator

  1. Start pubsub-emulator

    gcloud beta emulators pubsub start --project=$PROJECT_ID
  2. Create topic:

npx ts-node ./publisher.ts create pubsub-emulator-t1
  1. Create a subscription for the topic:

    npx ts-node ./subscriber.ts create pubsub-emulator-t1 pubsub-emulator-t1-sub
  2. Listen for the messages:

npx ts-node ./subscriber.ts receive pubsub-emulator-t1-sub
  1. Publish 50k messages:
    npx ts-node ./publisher.ts publish pubsub-emulator-t1

Additional information

I also got the error: Retry total timeout exceeded before any response was received in production environments. So it may not be a problem of pubsub-emulator

@google/pubsub version: "@google-cloud/pubsub": "^1.6.0"

That's why I am trying to make an example to reproduce it. But can't figure out what's going on. The only way I can reproduce this error now is to post 50k messages.

Test cases: https://gist.github.com/mrdulin/79f1689a9baaafaef90fcad42646bf6d

feywind commented 4 years ago

@mrdulin Thanks for the detailed write-up! Just based on the Java error trace at the top, I'm guessing this is something with the Pub/Sub emulator, but I'll try out your repro steps.

mrdulin commented 4 years ago

@feywind ok. Thanks. But I also hit this issue in the production environment. If I publish 50K messages without using Promise.all, just use for loop and async/await. The issue is gone. But it will block the execution of subsequent logic because of async/await until all messages have been published.

const n = 50 * 1000;
for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    await topic.publish(dataBuffer)
}
// some logic ...
res.json(data);

Any suggestions about how to publish a huge amount of messages without blocking the execution of subsequent logic? Do I need to use child_process or some queue like bull to publish the huge amount of messages in the background without blocking request/response workflow of the API? This means I need to respond to the front-end as soon as possible, the 50k messages should be the background tasks.

feywind commented 4 years ago

Realistically you should be able to just fire and forget (unless the Node process is terminated or something) so it does kind of seem like there's some library issue there. We're going to be pulling in some much newer versions of the pieces under the Pub/Sub library soon with the 2.0 release of the Pub/Sub library. I'm hoping that will help with some of these issues. I think the plan is to release that this week.

feywind commented 4 years ago

@google-cloud/pubsub 2.0.0 is now released, so if you're on Node 10+, it might be worth trying that to see if the newer versions of gRPC/gax help anything.

mrdulin commented 4 years ago

@feywind ok. thanks. I will try it.

stephenplusplus commented 4 years ago

@mrdulin Just checking in to see if you're still experiencing these issues?

feywind commented 4 years ago

There hasn't been any movement on this for a while, so I'm going to go ahead and close it. (Please feel free to re-open if you need any other help with it.)

mrdulin commented 4 years ago

@stephenplusplus @feywind The issue is gone when I upgraded the client library to "@google-cloud/pubsub": "^2.3.0"