Open edosrecki opened 5 months ago
@edosrecki Thanks for the issue. Our intent is that users will use the library's streaming pull facilities instead of calling the gapic-level functions directly. The service will periodically disconnect streams and requires the client side to manage that directly (reconnecting streams on failure, etc). So it's a lot of code work.
Is there a use case that requires using the gapic-level streaming pull function? If so, we'd be interested in hearing about that to potentially adapt the higher level one to cover it.
We have been experiencing the same issue. We are using v1.SubscriberClient
because we required more control over batch processing. It seems until this bug is addressed one must implement a reconnect manually because the subscriber client doesn't respect DEADLINE_EXCEEDED
from retry.retryCodes
.
@ik-southpole Are you able to share some details about what's not working about batch processing as it is?
Our implementation for our handwritten layer is here, if you can't avoid it:
https://github.com/googleapis/nodejs-pubsub/blob/main/src/message-stream.ts
Maybe that would help figure out if there's something missing.
Our process requires buffering n
messages before processing and acknowledging them all at once. However, the Subscription
interface, which implements the EventEmitter
interface, emits messages individually, while we needed to handle them in batches as Message[]
. Additionally, when using Subscription, the "Ack message count by delivery type" always showed pull
instead of streaming_pull
. To achieve higher throughput as recommended by the documentation, we opted for streaming_pull
. Consequently, using the same subscriber implementation, we observed that Subscription
achieved 700 acks per second, whereas v1.SubscriberClient#streamingPull
(custom) doubled our application's throughput.
@ik-southpole Thanks for the info. I know that there's a desire to provide a way to receive chunks of messages as Message[]
to user callbacks, but I'm not sure where it is on the roadmap.
@kamalaboulhosn Do you have thoughts on this?
We use
v1.SubscriberClient#streamingPull
method to create a bidirectional stream which we use to consume and ack Pub/Sub messages. After 15 minutes client throws theDEADLINE_EXCEEDED
(code 4) error. We have put a lot of effort to investigate why and where this error is thrown.There is a default 15 minute timeout for
streamingPull
RPC call which is defined in@google-cloud/pubsub
library here. It is possible to override this timeout by providing a different value when callingstreamingPull
method like so:However, this only postpones the throwing of the
DEADLINE_EXCEEDED
error.We have tracked down where this error is thrown, and it happens in
@grpc/grpc-js
library inResolvingCall
class inrunDeadline
method. This timer which is set on creation of theResolvingCall
class should be cleared when the response is received in theoutputStatus
method. TheoutputStatus
method is called fromonReceiveStatus
handler, or fromonReceiveMessage
handler if certain conditions are true.However,
onReceiveStatus
handler is NEVER called, which can be seen if you turn on debug logs:Then you can clearly see that
resolving_call
is instantiated and that timer is created.When message is received you also see:
But you never see
Received status
log, which means thatoutputStatus
method never gets called to reset the DEADLINE_EXCEEDED timer.I believe that this is a bug, and keep in mind that this only happens with
v1.SubscriberClient
, but works correctly with latestSubscription
class. When using that class, grpc debug output looks different, and you see 'Received status' logs, which means that timer gets restarted every time you receive the message.Environment details
@google-cloud/pubsub
version: 4.3.0Steps to reproduce
Check the repository with the code and instructions to reproduce the issue:
https://github.com/emartech/pub-sub-deadline-exceeded-repro