googleapis / google-cloud-java

Google Cloud Client Library for Java
https://cloud.google.com/java/docs/reference
Apache License 2.0
1.89k stars 1.06k forks source link

Pub/Sub streamingPull subscriber: large number of duplicate messages, modifyAckDeadline calls observed #2465

Closed kir-titievsky closed 5 years ago

kir-titievsky commented 6 years ago

Large number of duplicate messages is observed with Pub/Sub streamingPull client library using code that pulls from a Pub/Sub subscription and inserts messages into BigQuery with a synchronous blocking operation, with flowControl set to max 500 outstanding messages. See [1] for code.

For the same code, we also observe an excessive number of modifyAckDeadline operations (>> streamingPull message operations). And tracing a single message, we see modifyAcks and Acks in alternating order for the same message (modAck, modAck, Ack, modAck, Ack) [2]. This suggest that the implementation might fail to remove Ack'ed messages from a queue of messages to process and keep re-processing messages already on the client. This also suggests that ack requests may not actually be sent.

[2] https://docs.google.com/spreadsheets/d/1mqtxxm0guZcOcRy8ORG0ri787XLQZNF_FLBujAiayFI/edit?ts=59cac0f0#gid=2139642597

[1]

package kir.pubsub;

import com.google.api.gax.batching.FlowControlSettings; import com.google.cloud.bigquery.*; import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.SubscriptionName; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.AckReplyConsumer;

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.atomic.AtomicInteger;

public class Sub {

// Instantiate an asynchronous message receiver

public static void main(String... args) throws Exception {
    final String projectId = args[0];
    final String subscriptionId = args[1];

    final BigQuery bq = BigQueryOptions.getDefaultInstance().getService();
    final String datasetName = "pubsub_debug";
    final String tableName = "gke_subscriber";
    final AtomicInteger messageCount = new AtomicInteger(0);
    final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    MessageReceiver receiver = new MessageReceiver() {
                public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                    // handle incoming message, then ack/nack the received message
                    System.out.printf("%s,\t%s,\t%d\n",message.getData().toStringUtf8()
                            , message.getMessageId()
                            , messageCount.incrementAndGet());
                    Map<String,Object> row = new HashMap<String,Object>();
                    long timestampMs = message.getPublishTime().getSeconds()*1000 + message.getPublishTime().getNanos() / 1000000;
                    Date timestampDate = new Date(timestampMs);

                    row.put("messageId", message.getMessageId());
                    row.put("messageData", message.getData().toStringUtf8());
                    row.put("messagePublishTime", dateFormat.format(timestampDate));
                    // a version of this code without the bq.insert was ran, where consumer.ack() 
                    // was called immediately. The results were the same.
                    InsertAllResponse response = bq.insertAll(
                                InsertAllRequest.newBuilder(TableId.of(projectId, datasetName, tableName)).addRow(row).build()
                    );
                    if (response.hasErrors()) {
                        System.err.println("Error inserting into BigQuery " + response.toString() );
                        consumer.nack();
                    } else{
                        consumer.ack();
                    }
                }

            };

    SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
    Subscriber subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).setFlowControlSettings(
            FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()
    ).build();
    subscriber.startAsync();
    subscriber.awaitRunning();
    System.out.println("Started async subscriber.");
    subscriber.awaitTerminated();
}

}

pongad commented 6 years ago

@kir-titievsky Do you have a workload that can reliably reproduce this? If you do, could you try removing the flow control (hopefully the workload isn't so large that it crashes your machine)? If the problem goes away, this is probably a dup of #2452; the symptoms are nearly identical.

If this doesn't fix the problem, could you share the repro workload with me?

EDIT: If the workload is too large to remove flow control, the fix for the linked issue is already in master, so we can to test with that version. Slightly less convenient as we'll need to compile from source.

robertsaxby commented 6 years ago

The behaviour observed with #2452 was seen with the older non streaming pull implementation (0.21.1-beta). This issue came about when using the newer client library. It might also be worth noting that with the Flow Control set to 1000 max messages on the older library duplicates where not seen, just the stuck messages.

pongad commented 6 years ago

@robertsaxby That makes sense. I can reproduce the messages getting stuck, but not redelivery.

@kir-titievsky I need a little help understanding the spreadsheet. Are all rows for the same message? I assume that stream_id uniquely identifies a StreamingPull stream? Ie, one physical computer can have multiple IDs by opening many streams, but one stream ID is unique to one computer?

FWIW, I have a PR opened to address a potential race condition in streaming. It's concievable that the race condition causes this problem.

If you could set up a reproduction, please let me know.

kir-titievsky commented 6 years ago

@pongad You are right on all counts about the spreadsheet. All rows are for the same message, including traffic from several bidi streams, that had been opened at different times.

kir-titievsky commented 6 years ago

@pongad Did a couple experiments:

pongad commented 6 years ago

To summarize: This is an issue on the server side. Currently the client library does not have enough information to properly handle ack deadlines.

In the immediate term, consider using v0.21.1. That version uses a different (slower) pubsub endpoint, that isn't affected by this problem.

pongad commented 6 years ago

Pubsub team is working on a server-side mitigation for this. The client lib will need to be updated to take advantage of it. Fortunately, this new feature "piggybacks" on an already existing one, so the work on client lib can progress right away.

I hope to create a PR for this soon.

pongad commented 6 years ago

Update: the server side release should happen this week. The feature should be enabled next week. The client (in master) has already been modified to take advantage of this new feature.

When the server feature is enabled, we'll test to see how this helps.

pongad commented 6 years ago

The server-side fix has landed. If you are affected, could you try again and see if you observe fewer duplicates?

While we expect the fix to help reduce duplication on older client libs, I'd encourage moving to latest release (v0.30.0) since more fixes has landed during that time.

ericmartineau commented 6 years ago

We are seeing this behavior currently. We are using v0.30.0-beta of the pubsub library, and our subscriptions are all set to a 60s ack deadline. We have a subscription that is currently extended the deadline for over 750K unacked messages:

Screenshot of stackdriver below: https://screencast.com/t/KogMzx0q7f5

Our receiver always performs either ack() or nack(), and the occasional message that sneaks through when symptoms look like this complete within 1s, usually faster.

I deleted the subscription and recreated it, and saw the modack calls drop to zero, only to climb back almost instantly to where they were before.

Is there anything else that will help you/us troubleshoot this issue?

kir-titievsky commented 6 years ago

Eric, how does the mod ack rate compare to your Publish and ack rates? On Mon, Dec 11, 2017 at 9:19 AM Eric Martineau notifications@github.com wrote:

We are seeing this behavior currently. We are using v0.30.0-beta of the pubsub library, and our subscriptions are all set to a 60s ack deadline. We have a subscription that is currently extended the deadline for over 750K unacked messages:

Screenshot of stackdriver below: https://screencast.com/t/KogMzx0q7f5

Our receiver always performs either ack() or nack(), and the occasional message that sneaks through when symptoms look like this complete within 1s, usually faster.

I deleted the subscription and recreated it, and saw the modack calls drop to zero, only to climb back almost instantly to where they were before.

Is there anything else that will help you/us troubleshoot this issue?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465#issuecomment-350793285, or mute the thread https://github.com/notifications/unsubscribe-auth/ARrMFhZdtKQEgS0G7y0c3GvvhizKQlzHks5s_WQXgaJpZM4Pk8_U .

--

Kir Titievsky | Product Manager | Google Cloud Pub/Sub

ericmartineau commented 6 years ago

This is the ack rate during the same window: https://screencast.com/t/FR2utgEau

The publish rate: https://screencast.com/t/2bT4ZcMkJwG

We have a backlog of 1MM messages. Also, we have three separate identical environments with roughly the same usage, and we're seeing this issue on two of them. A third environment seems unaffected.

kir-titievsky commented 6 years ago

Should have asked this earlier: but is your streamingPull operations < 10 messages/second?

On Mon, Dec 11, 2017 at 2:19 PM Eric Martineau notifications@github.com wrote:

This is the ack rate during the same window: https://screencast.com/t/FR2utgEau

The publish rate: https://screencast.com/t/2bT4ZcMkJwG

We have a backlog of 1MM messages. Also, we have three separate identical environments with roughly the same usage, and we're seeing this issue on two of them. A third environment seems unaffected.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465#issuecomment-350828403, or mute the thread https://github.com/notifications/unsubscribe-auth/ARrMFjXRxZ81Jhe-5kUCdZ6KFJpGe0Faks5s_YAsgaJpZM4Pk8_U .

-- Kir Titievsky | Product Manager | Google Cloud Pub/Sub https://cloud.google.com/pubsub/overview

ericmartineau commented 6 years ago

@kir-titievsky https://screencast.com/t/Nrlxws3v (the drop at the end is when I deleted and recreated the topic)

kir-titievsky commented 6 years ago

looks like you are not acking most messages, which would leave them stuck being modAcked, no?

On Mon, Dec 11, 2017 at 3:32 PM Eric Martineau notifications@github.com wrote:

https://screencast.com/t/Nrlxws3v (the drop at the end is when I deleted and recreated the topic)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465#issuecomment-350849266, or mute the thread https://github.com/notifications/unsubscribe-auth/ARrMFiq-5_4qqk0oAm1fgABqmP5dKeUjks5s_ZFXgaJpZM4Pk8_U .

-- Kir Titievsky | Product Manager | Google Cloud Pub/Sub https://cloud.google.com/pubsub/overview

ericmartineau commented 6 years ago

We're either acking or nacking every message. My assumption is that nacking would cause the message to become available for immediately redelivery.

kir-titievsky commented 6 years ago

right. It looks like you are nacking most messages. The question is why.

On Mon, Dec 11, 2017 at 6:14 PM Eric Martineau notifications@github.com wrote:

We're either acking or nacking every message. My assumption is that nacking would cause the message to become available for immediately redelivery.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465#issuecomment-350889875, or mute the thread https://github.com/notifications/unsubscribe-auth/ARrMFkAGocawH5OD5S3svQxmCXPO9_4Eks5s_bdAgaJpZM4Pk8_U .

-- Kir Titievsky | Product Manager | Google Cloud Pub/Sub https://cloud.google.com/pubsub/overview

ericmartineau commented 6 years ago

I've been reading more this morning... and I realized that a nack() in our code would show up as a "Modify Ack Deadline Operation" (because it effectively sets the ack deadline to 0). Is that correct?
Is there any possibility for the "Ack Padding" (my understanding is that the pubsub client will request a deadline extension for every message to pad for latency) to conflict with a nack?

FWIW - when we're seeing the hundreds of thousands of modacks (the screenshot I posted above), our code is effectively idle - specifically that our MessageReceiver is not receiving any messages from the dispatcher.

Other than that, is there any way to see the pubsub client's 'deadline extension' calls vs nack()? I've posted the code below as well in case there's something we could do differently on our side.

return (message, acker) -> {
   // isRunning() is an internal check for application shutdown state to not attempt processing 
   // the message due to the likelihood that resources needed to process won't be available
    if (!isRunning()) {
        acker.nack();
        log.info("Skipping message {} because we're shutting down", message.getMessageId());
    }

    try {
        final ImportResult importResult = handleImport(message.getData().toStringUtf8());

        //... truncated - log to metrics system ...

        if (!importResult.isRetryable()) {
            acker.ack();
        } else {
            // We do pessimistic resource checks, and flag the message as retryable if it's unable 
            // to obtain a "lock" on a resource in a short period of time 
            acker.nack();
        }
    } catch (Throwable e) {
        // We're trapping all exceptions in the block above - this expands to catch `Throwable`
        acker.ack();
        log.error("sync/subscription/unexpected Unexpected exception processing message", e);
    } 
};
pongad commented 6 years ago

It's a little annoying, but we log to logger named "com.google.cloud.pubsub.v1.MessageDispatcher". If you turn logging to FINER, we log "Sending NNN nacks". We send it very often though.

If your code is idle, this sounds like a serious problem. If there are messages pending, they should be scheduled immediately. I have a few guesses.

  1. Flow control leak. The client keeps track of messages in progress. If the user code does not call ack or nack, the client will think that these messages are still running and not execute more messages. From @smartytime 's response, this seems unlikely.

  2. Maybe the client is somehow not properly releasing the flow control, or failing to schedule new tasks despite the flow control being free.

  3. Maybe the executor is deadlocked. This could only happen if you provide your own executor for client lib to use, some task adds more job to the executor, then waits for the new job to finish. Eventually all threads just wait for tasks to finish so no one actually gets to execute the new tasks.

To rule out (3), could you try running jstack on a running JVM? It will contain stack trace of all threads, so it should help us see if threads are deadlocking. It might contain confidential info though.

To help diagnose (2), could you share with us how you're creating the Subscriber? Maybe your settings are tickling a bug we haven't seen before, etc.

ericmartineau commented 6 years ago

pongad - I've been trying to prep more intel, but I'll throw out a few more things that might be useful:

       return Subscriber.newBuilder(subscriptionPath, receiver)
                .setParallelPullCount(1)
                .setMaxAckExtensionPeriod(Duration.ZERO)
                .setFlowControlSettings(FlowControlSettings.newBuilder()
                        .setMaxOutstandingElementCount(10L)
                        .setMaxOutstandingRequestBytes(null)
                        .setLimitExceededBehavior(LimitExceededBehavior.Block)
                        .build())
                .setCredentialsProvider(fixedCredentialsProvider)
                .setExecutorProvider(executorProvider)
           .build();

1) we are running this in a cluster against two topics/subscriptions. Each instance has a single Subscriber for each subscription, and each Subscriber uses a dedicated ScheduledThreadPoolExecutor using FixedExecutorProvider. 2) Local dev (unable to reproduce) runs oracle jdk 1.8.0_144, production runs openjdk 8u111 3) I can connect development instances to the prod subscription without any issues (as of yet). 4) We rolled out a change that allowed us to start/stop Subscriber instances, modify the # of available threads, etc. If I shut down all subscribers, the modacks, pulls all drop to 0 (so, no zombie instances). When this odd behavior is occuring, if I start even a single thread on a single instance for a single subscription, the modacks will jump up to where they were before.

For example, there's a single thread for a single subscriber across the entire cluster that's currently "working":

State: waiting
Wait Count: 188852
Block Count: 21
------------------------
    -sun.misc.Unsafe.park
    -java.util.concurrent.locks.LockSupport.park
    -java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await
    -java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take
    -java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take
    -java.util.concurrent.ThreadPoolExecutor.getTask
    -java.util.concurrent.ThreadPoolExecutor.runWorker
    -java.util.concurrent.ThreadPoolExecutor$Worker.run
    -java.lang.Thread.run

Our logs show no task completion (we log whenever we ack/nack), and stackdriver shows ~90 modack op/s, and ~45 pull op/s.

The TRACE logs look like this (I've removed all the Sending 0 xxx.

Sending 10 receipts
2017-12-14 16:38:41.974
Sending 10 nacks
2017-12-14 16:38:41.973
Sending 6 nacks
2017-12-14 16:38:41.873
Sending 10 receipts
2017-12-14 16:38:41.773
Sending 4 nacks
2017-12-14 16:38:41.772
Sending 2 receipts
2017-12-14 16:38:41.672
Sending 2 nacks
2017-12-14 16:38:41.672
Sending 10 receipts
2017-12-14 16:38:41.471
Sending 10 nacks
2017-12-14 16:38:41.471
Sending 2 acks
2017-12-14 16:38:41.420
Sending 7 nacks
2017-12-14 16:38:41.371
Sending 2 receipts
2017-12-14 16:38:41.320
Sending 1 receipts
2017-12-14 16:38:41.319
Sending 1 acks
2017-12-14 16:38:41.319
Sending 10 receipts
2017-12-14 16:38:41.270
Sending 3 nacks
2017-12-14 16:38:41.270
Sending 3 receipts
2017-12-14 16:38:41.170
Sending 3 nacks
2017-12-14 16:38:41.169
Sending 10 receipts
2017-12-14 16:38:40.969
Sending 10 nacks
pongad commented 6 years ago

@smartytime Thank you for the info! From this, I believe the problem is not with the client lib. Reasoning below:

The stack dump of your thread shows that it's waiting for things to do. The executor is definitely not deadlocked.

However, our logs seem to disagree: You said

Our logs show no task completion (we log whenever we ack/nack)

From the MessageDispatcher log, the subscriber is actually quite active. In the window of 1 second, it has received 58 messages (the "receipts" [1]), nacked 55, and acked only 3. Unless the client lib is doing something incredibly wrong, it looks like you're either calling consumer.ack, consumer.nack, or throwing exception from receiveMessage (exceptions count as nack). I think this explains the message backlog: most messages are getting nacked and redelivered.

[1] The "receipts" feature is added to help with message duplication problem. The client lib sends modacks to let the server know it has received the messages. This might explain why we're seeing a lot of modacks. Unfortunately the receipts are confusing the metric.

Lastly, I think setMaxAckExtensionPeriod(Duration.ZERO) is not the right thing to do. The client lib periodically sends modacks to keep your messages alive; this is potentially important even if you process the messages quickly. For example, you take only 1s to process a message and the message deadline is 60s. The server might send us 100 messages in one batch. You'd only be able to process 60 of them; the other 40 will get redelivered. Judging from the dispatcher log, I don't think this is the problem, but I can't completely rule it out.

ericmartineau commented 6 years ago

@pongad - Much thanks. I've been trying to read through the pubsub code so you don't have to explain it all... feel like I'm slowly catching up.

Knowing about the receipts helps - in an ideal world where messages are processed within their original lease, should we see roughly a 1/1 modack/message ratio? (one receipt modack per message)

What is an appropriate value for maxAckExtensionPeriod? And does that value represent the time beyond the original deadline? Or is it inclusive of the original deadline? In your example above, if I wanted the client to release any messages not processed within 5 minutes would I set the maxAckExtensionPeriod to 5 minutes, or 4 minutes?

There was another thing I was looking into that was confusing me. I see this occasionally in thread dumps for subscriber executor threads. :

Thread Name: Pubsub Subscriber:  sync-events-alexandria
State: runnable
Wait Count: 173806
Block Count: 34
------------------------
    -java.math.BigInteger.oddModPow
    -java.math.BigInteger.modPow
    -sun.security.rsa.RSACore.crtCrypt
    -sun.security.rsa.RSACore.rsa
    -sun.security.rsa.RSASignature.engineSign
    -java.security.Signature$Delegate.engineSign
    -java.security.Signature.sign
    -com.google.api.client.util.SecurityUtils.sign
    -com.google.api.client.json.webtoken.JsonWebSignature.signUsingRsaSha256
    -com.google.auth.oauth2.ServiceAccountJwtAccessCredentials.getJwtAccess
    -com.google.auth.oauth2.ServiceAccountJwtAccessCredentials.getRequestMetadata
    -io.grpc.auth.GoogleAuthLibraryCallCredentials$1.run
    -java.util.concurrent.Executors$RunnableAdapter.call
    -java.util.concurrent.FutureTask.run
    -java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201
    -java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
    -java.util.concurrent.ThreadPoolExecutor.runWorker
    -java.util.concurrent.ThreadPoolExecutor$Worker.run
    -java.lang.Thread.run

Normally, I'd expect something like this:

    -[our message handler here]
    -com.google.cloud.pubsub.v1.MessageDispatcher$4.run
    -java.util.concurrent.Executors$RunnableAdapter.call
pongad commented 6 years ago

@smartytime I'll answer out of order :)

The stack trace looks like credentials being refreshed. Once in a while (I believe one hour), a gRPC connection needs to refresh credentials, then the credentials is cached and usable for another hour etc. By default, we open a few connections, so I think it makes sense that you're seeing this once in a while.

Ideally, you'd observe < 2 modack/msg in the current implementation. One of the them is the receipt. In the current implementation, deadline extension isn't very smart. When it runs, it extends deadline for all pending messages. Eg, if we pull a message one second before the extension is scheduled to run, we send a modack for that message anyway even if we have a while before the message expires. This accounts for some messages getting a second one even though you ack quickly. It's definitely possible to make this smarter in the future. It's a little finicky and caused bugs in the past though, so we're being a little cautious with it.

That said, we're not being that inefficient. We keep a distribution of how long you take to ack messages and periodically adjust the stream's deadline to 99.9%th percentile so for the vast majority of the messages, you should get fewer than 2 modacks.

Right now if you set the max extension to 5 minutes, we might extend it to more than 5. This is a bug: if you can tolerate it, you can set it to 5 now. I'll work on a fix for this. Note that you should generally set the max extension "unreasonably high" since setting it too low will result in duplicate messages. This feature was actually meant as a stopgap to make sure messages aren't stuck forever if we forget to ack them.

alex-kisialiou-sciencesoft commented 6 years ago

I found this: https://cloud.google.com/pubsub/docs/reference/rest/ and i guess it'll take a day or so to create your own client that covers your needs.

kir-titievsky commented 6 years ago

@alex-kisialiou-sciencesoft You might find this useful: https://developers.google.com/api-client-library/java/apis/pubsub/v1, but you could try the lower level auto-generated client for gRPC as well (see the synchronous pull example https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-sync-java)

yonran commented 6 years ago

We ran into this issue on our GCS Object Change Notification (very small messages) subscribers and repeated it in a test program. After upgrading google-cloud-java from before 0.21.1-beta to after 0.42.0-beta, the subscriber works fine until it falls behind. Once it falls behind, the PubSub server starts sending duplicates about every hour, so that over half of messages are duplicates. The Subscriber can easily fall further and further behind. Therefore, the Subscriber either needs to process messages very quickly, or it needs to cache several hours of message ids to ack duplicates quickly. After contacting GCP support, they told me that this is a known bug with StreamingPull of small messages, and that a workaround is to downgrade to google-cloud-pubsub 0.21.1-beta.

While we wait for a fix to the StreamingPull server API, would it be possible for google-cloud-pubsub library to offer an option to use the Pull API instead of StreamingPull?

kir-titievsky commented 6 years ago

Yonatan, You can find an example of using a synchronous pull here: https://cloud.google.com/pubsub/docs/pull .

That said, you say that messages get re-delivered every hour. This suggests a pattern we have not considered. Might you say why you take more than an hour to acknowledge a message once you get it?

On Tue, Jun 12, 2018 at 6:56 PM Yonathan Randolph notifications@github.com wrote:

We ran into this issue on our GCS Object Change Notification https://cloud.google.com/storage/docs/object-change-notification (very small messages) subscribers and repeated it in a test program https://github.com/yonran/pubsubfallingbehindbug/. After upgrading google-cloud-java from before 0.21.1-beta to after 0.42.0-beta, the subscriber works fine until it falls behind. Once it falls behind, the PubSub server starts sending duplicates about every hour, so that over half of messages are duplicates. The Subscriber can easily fall further and further behind. Therefore, the Subscriber either needs to process messages very quickly, or it needs to cache several hours of message ids to ack duplicates quickly. After contacting GCP support, they told me that this is a known bug https://cloud.google.com/pubsub/docs/pull#dealing-with-large-backlogs-of-small-messages with StreamingPull of small messages, and that a workaround is to downgrade to google-cloud-pubsub 0.21.1-beta.

While we wait for a fix to the StreamingPull server API, would it be possible for google-cloud-pubsub library to offer an option to use the Pull API instead of StreamingPull?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465#issuecomment-396760324, or mute the thread https://github.com/notifications/unsubscribe-auth/ARrMFsJUX1rvxFVrMw1xezvfgis_pSY2ks5t8EcegaJpZM4Pk8_U .

-- Kir Titievsky | Product Manager | Google Cloud Pub/Sub https://cloud.google.com/pubsub/overview

yonran commented 6 years ago

@kir-titievsky, thank you for the synchronous pull sample using the Pull GRPC API directly. However, we have written a number of MessageReceiver/Subscribers which used to work and we want to avoid rewriting them.

Might you say why you take more than an hour to acknowledge a message once you get it?

My client-side code acknowledges all messages in a matter of seconds (In my RawPubSub.java version, I request one more message at a time from the server). The client never holds on to a message for more than a few seconds. I believe this is a server-side bug in the StreamingPull API which results in duplicate messages.

kir-titievsky commented 6 years ago

That sounds like a bug somewhere. The re-delivery after an hour, in particular, makes me suspicious. If you could, might you file a separate bug with a reproduction? If not,

  1. Might you open a support case with GCP support, if you have a support plan, detailing when you observed this on what project and subscription?
  2. If not, could you send the same to cloud-pubsub@google.com? No guarantees, but I might be able to take a look.

An alternative explanation for this behavior is that the acks never succeed. Which might make this a client-side bug. But hard to tell.

luann-trend commented 6 years ago

We have experienced similar kind of issue here with the java google-cloud-pubsub lib GA version 1.31.0. We don't get the duplicate messages but the messages seem stuck in the queue even though we send ack back. After restarts the clients, the stuck messages got cleared up.

pongad commented 6 years ago

@luann-trend What does "stuck" here mean? Are new messages not being processed?

luann-trend commented 6 years ago

@pongad The new messages still being processed, only there couple hundreds message keep get redelivered but not able to process for some reason. We have experienced the same issues on 2 different cluster environments after about 4-5 days start using the new Java Google-pubsub client GA version.

pongad commented 6 years ago

@luann-trend This is interesting. Is it possible that the messages are causing you to throw exception? We catch exception and nack messages automatically, assuming that user code failed to process it. Do you know the duration of time between redelivers?

chingor13 commented 5 years ago

Should have been fixed in #3743