Azure / azure-service-bus-java

☁️ Java client library for Azure Service Bus
https://azure.microsoft.com/services/service-bus
MIT License
60 stars 59 forks source link

Exception acknowledging messages #293

Closed h5chauhan closed 5 years ago

h5chauhan commented 6 years ago

Describe the bug We are seeing the below error when completing the messages received with receiver.receiveBatch() call.

Exception in thread "main" com.microsoft.azure.servicebus.primitives.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:4b16ce20-5d8d-42a4-874e-c6268b1507f3, TrackingId:70433eac0000315f000016045bac1079_G27_B18, SystemTracker:some:queue:some-space~207, Timestamp:9/26/2018 11:05:32 PM, errorContext[NS: <some-service>.servicebus.windows.net, PATH: <SecretPath>, REFERENCE_ID: caf290e16a084ff39ec49a53788d289e_G27, PREFETCH_COUNT: 100, LINK_CREDIT: 0, PREFETCH_Q_LEN: 10

We have tried disabling prefetch to no effect. The batch size is set between 50-100. Example of the way we create client is shown below -

rec = ClientFactory.createMessageReceiverFromConnectionStringBuilder(
        new ConnectionStringBuilder(accessURI, entityPath, sharedAccessKeyName, sharedAccessKey), ReceiveMode.PEEKLOCK)
...
...
val messages = toScala(rec.receiveBatch(batchSize)).toList
ackBuff ++= messages.map(_.getLockToken)
... <do something>
...
 ackBuff foreach {
      // The service bus sometimes throws an error even when we try to complete messages in time hence we swallow the error.
      scala.util.Try(rec.complete(_))
      log.debug("Commit successful for message id : {}", _)
}

Expected behavior The above code swallows the exception mentioned in order to continue processing but this causes duplicates in our system.

Environment (please complete the following information):

yvgopal commented 6 years ago

First you shouldn't swallow the exception on complete. If complete fails, you will receive the same message again. MessageLockLostException rarely gets thrown when you complete it in time. Please log/trace the lock token and LockedUntilUtc time. If you are getting one too many LockLost exceptions even within LockedUntilUtc, I will investigate further.

h5chauhan commented 6 years ago
  val sharedAccessKey: String = "secret"
  val entityPath: String = "my-entity-path"
  val accessURI: URI = URI.create("sb://my-service.servicebus.windows.net/")
  val sharedAccessKeyName = "All"
  val fw = new FileWriter("output.txt", true)
  val receiver = ClientFactory.createMessageReceiverFromConnectionStringBuilder(new ConnectionStringBuilder(accessURI,
    entityPath, sharedAccessKeyName, sharedAccessKey))
  receiver.setPrefetchCount(10)
  val objectMapper = new ObjectMapper()
  while (true) {

    val messages = receiver.receiveBatch(10)
    if (messages !=  null) {
      messages.asScala.foreach { m =>
        val body = new String(m.getBody)
        val properties = m.getProperties
        val tree = objectMapper.readTree(properties.get("JsonData"))
        val jsonString = objectMapper.writeValueAsString(tree)
        fw.append(jsonString)
        fw.append("\n")
        fw.flush()
        receiver.complete(m.getLockToken)
      }
    } else {
      println("Message is null")
    }
  }
}

Here is what i am doing to reproduce this the error happens on the receiver.complete() call.

h5chauhan commented 6 years ago
2018-10-29 18:58:12,305 WARN || Received delivery '????4CB?,???' state 'Rejected{error=Error{condition=com.microsoft:message-lock-lost, description='The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:f27dfe2e-091c-4dc9-a1db-f2fcd5a5f4a4, TrackingId:70433eac0000315f000000595bd75640_G23_B6, SystemTracker:partner-prod:queue:my-queue~207, Timestamp:10/29/2018 6:58:12 PM', info=null}}' doesn't match expected state 'Accepted{}' [com.microsoft.azure.servicebus.primitives.CoreMessageReceiver]

I wonder if this will give more context to understand the issue?

yvgopal commented 6 years ago

what is the lock duration on your entity? Let's say you receive a batch of 10 messages. All messages will have the same lock duration. And you are completing them sequentially one after the other. So the last message in your batch will be completed after the first 9 messages are processed and completed. Depending upon how long processing each message takes, the last one's lock might have expired by the time you call complete for that message.

There are some options you can try..

  1. Increase the lock duration on your entity. ServiceBus supports a maximum lock duration of 5 minutes.
  2. Check your message processing time. You are writing to a file in this sample code. Real message processing may be doing something else. If processing takes time, receiving 1 message at a time is better than receiving a batch of messages.
  3. If you receive a batch, parallel-ize your message processing. Rather than processing and completing in a loop sequentially, distribute the received messages among multiple threads so the last message in the batch doesn't wait longer than lock duration to be complete.
  4. You can try QueueClient.registerMessageHandler/SubscriptionClient.registerMessageHandler with max concurrent calls setting.