googleapis / java-pubsub

Apache License 2.0
123 stars 88 forks source link

PubSub: Message ordering appears to cause race conditions and duplicate messages #1974

Open turneand opened 3 months ago

turneand commented 3 months ago

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ..."). General, Core, and Other are also allowed as types
  2. OS type and version: Windows/unix
  3. Java version: 17
  4. version(s): 1.126.4 and 1.127.3 (confirmed issue on these two versions)

Steps to reproduce

This is similar steps as in https://github.com/googleapis/java-pubsub/issues/1889

  1. create a new pubsub topic and subscription with message ordering enabled (and exactly once delivery)
  2. create a subscriber with random sleeps, parallel pull count of 2, and max outstanding element count of 1000
  3. publish 500 messages, with random sleeps
  4. the messages should all be processed in order, which is mostly true
  5. However, we are frequently inundated with errors "failed to send operations" - "Some acknowledgement ids in the request were sent out of order"

A workaround we have is to set the "maxOutstandingElementCount" down to 1, which appears to effectively disable any batching. Although our application can handle a certain degree of instability with pubsub, the extent we get these (hundreds per day) is excessive.

Code example

Note, this seems to be a race condition, I've managed to get the following to reproduce the issues consistently, but not 100% guranteed. On some executions, only a couple of errors get logged, but on others I get hundreds.

In our live applications, we get this when sending acks or nacks, which causes significant delays as we have to wait for the default timeouts to occur before it will attempt again.


var project =
var subscriptionName = ProjectSubscriptionName.of(project, "andrew-test-sub-order2");
var topicName = TopicName.of(project, "andrew_test");
var publisher = Publisher.newBuilder(topicName).setEnableMessageOrdering(true).build();

var subscriber = Subscriber.newBuilder(subscriptionName, (PubsubMessage message, AckReplyConsumer consumer) -> {
    try {
        long sleepFor = (int) (Math.random() * 200);
        System.err.println(Thread.currentThread().getName() + ";Received;" + message.getData().toStringUtf8() + ";" + sleepFor);
        Thread.sleep(sleepFor);
        consumer.ack();
    } catch (Exception e) {
        System.err.println("!!!!error - " + message.getData().toStringUtf8());
        e.printStackTrace();
    }
})
.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).setMaxOutstandingRequestBytes(104857600L).build())
.setMaxAckExtensionPeriod(Duration.ofMinutes(60L))
.setMaxDurationPerAckExtension(Duration.ofMillis(0L))
.setParallelPullCount(2)
.build();

System.err.println("Starting subscriber");
subscriber.startAsync().awaitRunning();

try {
    var now = LocalDateTime.now();
    for (int i = 0; i <= 500; i++) {
        var data = "prefix," + now + "," + i;

        try {
            long sleepFor = (int) (Math.random() * 200);
            System.err.println(Thread.currentThread().getName() + ";Sending;" + data + ";" + sleepFor);
            Thread.sleep(sleepFor);
        } catch (Exception e) {
            e.printStackTrace();
        }

        var message = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(data))
                .setOrderingKey("defabc")
                .build();
        publisher.publish(message);
    }
} finally {
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
}

try {
    System.err.println("Awaiting termination");
    subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    subscriber.stopAsync();
}

Stack trace

com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFalure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were sent out of order.

External references such as API reference guides

Any additional information below

This is similar to https://github.com/googleapis/java-pubsub/issues/1889

Following these steps guarantees the quickest resolution possible.

Thanks!

turneand commented 3 months ago

Unfortunately my workaround is not correct, as have observed these errors, and therefore the associated duplicate messages even when the maxOutstandingElementCount is null, or 0.

michaelpri10 commented 3 months ago

I was able to reproduce the INVALID_ARGUMENT: Some acknowledgement ids in the request were sent out of order errors which seem to occur because of multiple outstanding acknowledgement requests. We are still investigating this issue, but as a current workaround, you can utilize the acknowledgement with response interface when receiving messages. Following the SubscribeWithExactlyOnceConsumerWithResponse sample will allow you to check the response of your acknowledgement call for each message. This will prevent multiple outstanding acknowledgement requests by only processing the next message once the previous message has been successfully acknowledged, although it may slow down message processing.