Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

[QUERY] EventHubConsumerClient - exceeded the maximum number of allowed receivers per partition in a consumer group which is 5, for one single read thread #38117

Closed anlong465 closed 9 months ago

anlong465 commented 11 months ago

Library name and version

Azure.Messaging.EventHubs.Consumer 5.15.5

Query/Question

I was headache by this exception for a couple of months: "Azure.Messaging.EventHubs.EventHubsException(QuotaExceeded): Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5."

I did some google, and found https://github.com/Azure/azure-sdk-for-net/issues/33938 But my scenario is a little different, and then I have to create a new one:

Anyway, a brief of my scenario is:

The weird thing in the above exception message is: 1) all listed receivers share one same UUID; 2) the related exception stack trace doesn't include any lines of our own codes, and then we don't know how to catch such exception

Environment

No response

jsquire commented 11 months ago

Hi @anlong465. Thank you for reaching out and we regret that you're experiencing difficulties. The error message that you're seeing indicates that you have more than 5 different consumer types attempting to read from a partition using the same consumer group.

There's a few things here that we'll need to figure out before we can assist:

github-actions[bot] commented 11 months ago

Hi @anlong465. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

anlong465 commented 11 months ago

Hi @jsquire , thanks a lot for your timely resposne and sorry for the mess. 1) I am using Java. and the related maven dependency is:

com.azure azure-messaging-eventhubs 5.15.5

2) Regarding with full stack trace: all log4j errors are stored into database, via the database record, I have the below finding: _com.azure.core.amqp.exception.AmqpException: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - 5fed4790-f44b-4054-a44a-5942dcb616c3, 5fed4790-f44b-4054-a44a-5942dcb616c3, 5fed4790-f44b-4054-a44a-5942dcb616c3, 5fed4790-f44b-4054-a44a-5942dcb616c3, 5fed4790-f44b-4054-a44a-5942dcb616c3. TrackingId:d3ec969b-c2be-42ac-b064-1bdb23227f4d_B42, SystemTracker:brewdat-supply-eventhub-prod:eventhub:brewdat-supply-prod~16383, Timestamp:2023-12-08T06:14:40 Reference:856b75db-575b-41ff-be39-682e4ba602de, TrackingId:09c995eb-a30c-4db8-98a7-e2b5db3a711e_B42, SystemTracker:brewdat-supply-eventhub-prod:eventhub:brewdat-supply-prod~16383|brewdat-iot-monitor, Timestamp:2023-12-08T06:14:40 TrackingId:0f08adfb0dd5407eb9292108628952c7_G29, SystemTracker:gateway5, Timestamp:2023-12-08T06:14:40, errorContext[NAMESPACE: brewdat-supply-eventhub-prod.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A, PATH: brewdat-supply-prod/ConsumerGroups/brewdat-iot-monitor/Partitions/0, REFERENCE_ID: 0_aa5e3c_1702016080539, LINKCREDIT: 0] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:88) at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:38) at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onLinkRemoteClose(ReceiveLinkHandler.java:228) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.ut 1702953164186

3) Regarding with initializing protected EventHubConsumerClient makeEventHubConsumerClient() { return new EventHubClientBuilder() .connectionString(connectionString) .consumerGroup(consumerGroup) .buildConsumerClient(); } private final EventHubConsumerClient consumer = makeEventHubConsumerClient() ;

4) Regarding with managing its lifetime Actually, I never close it as my thread is one long-running thread, and besides of fetching out message, I didn't do any management against it. Below here it is my Thread.run() method, 1702954274268

And here it is to fetch out message from EventHub 1702954573296

Please let me known if furthure clarification is required.

Such issue is very important to us. You see, yesterday (Monday) when I came back office and just did one normal routine chceking, and foud out it was stopped two days ago at Satursday night. And then I have to kill the pod manually, from log it looks like zombies as no more log generated out, but my k8s health probe still working (which is based upon linux shell command to compare two files' content, both store number of the current epoch mills, one file is kept being updated by my long-running thread, and after comparing, this file conent would be copied into another file.) As k8s health probe does't kill my pod, it means my pods still working. So, I need improve health probe logic. Anyway, health probe is another story, I need make such wierd thing fixed (all 5 eventhub receivers share one same UUID).

anlong465 commented 11 months ago

BTW, because of the column length limitation, the exception stack trace was truncated, and the below here is one similar snapshot 1702966140005

jsquire commented 11 months ago

Hi @anlong465. Thank you for the context. Since you're using Java, I'm going to transfer this to the Azure SDK for Java repository, where the team members best able to assist will see it

jsquire commented 11 months ago

//cc: @conniey, @lmolkova, @anuchandy

anlong465 commented 11 months ago

Dear Support Team,

In my dev environment such issue happened once again 20+ hours ago and I just found out this morning minutes ago , and below here it is the full stack trace:

_2023-12-20 12:15:13,783 ERROR com.azure.core.util.logging.LoggingEventBuilder: {"az.sdk.message":"Error occurred in subscriber.","exception":"Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758. TrackingId:df04b2e6-37a6-4ffc-a709-e681dbeeb56a_B4, SystemTracker:brewdat-supply-eventhub-dev:eventhub:brewdat_dev~32766, Timestamp:2023-12-20T04:15:13 Reference:d1d205d4-ef76-497c-b8f3-4d6c6d3a528e, TrackingId:88e6e3bc-b68d-488a-b4e7-c257bef3f876_B4, SystemTracker:brewdat-supply-eventhub-dev:eventhub:brewdat_dev~32766|consmosdb-test, Timestamp:2023-12-20T04:15:13 TrackingId:360620342eea4964a91e48e96a6d365e_G16, SystemTracker:gateway5, Timestamp:2023-12-20T04:15:13, errorContext[NAMESPACE: brewdat-supply-eventhub-dev.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A, PATH: brewdat_dev/ConsumerGroups/consmosdb-test/Partitions/0, REFERENCE_ID: 0_9e4a23_1703045713152, LINK_CREDIT: 0]","subscriberId":"129110"} 2023-12-20 12:15:13,785 ERROR reactor.util.Loggers$Slf4JLogger: Operator called default onErrorDropped com.azure.core.amqp.exception.AmqpException: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758. TrackingId:df04b2e6-37a6-4ffc-a709-e681dbeeb56a_B4, SystemTracker:brewdat-supply-eventhub-dev:eventhub:brewdat_dev~32766, Timestamp:2023-12-20T04:15:13 Reference:d1d205d4-ef76-497c-b8f3-4d6c6d3a528e, TrackingId:88e6e3bc-b68d-488a-b4e7-c257bef3f876_B4, SystemTracker:brewdat-supply-eventhub-dev:eventhub:brewdat_dev~32766|consmosdb-test, Timestamp:2023-12-20T04:15:13 TrackingId:360620342eea4964a91e48e96a6d365e_G16, SystemTracker:gateway5, Timestamp:2023-12-20T04:15:13, errorContext[NAMESPACE: brewdat-supply-eventhub-dev.servicebus.chinacloudapi.cn. ERROR CONTEXT: N/A, PATH: brewdat_dev/ConsumerGroups/consmosdb-test/Partitions/0, REFERENCE_ID: 0_9e4a23_1703045713152, LINKCREDIT: 0] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:124) at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:64) at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onLinkRemoteClose(ReceiveLinkHandler.java:228) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 2023-12-20 12:16:02,784 ERROR com.alibaba.druid.pool.DruidDataSource: {conn-13453} discard

Any suggesation would be highly appreciated!

anlong465 commented 11 months ago

Dear Support,

Such issue could be easily re-produced via the below demo code after a couple of minutes running, which is very similar to my use case, and the only difference is: my use case just uses one single thread, but the below demo uses 5 parallel threads. And the rest are the same, and also meets the same error message: exceeds the maximum 5 receivers, but those 5 receivers id are the same.

In my use case, only one read thread, and then such issue is not easy to re-produced. Usually, it takes days or weeks to reproduce. After all, such issue would happen for one single thread, and then when the parallel thread number increases, the possibility of such issue should increase also.

Now, I increase the thread number to 5, according to Eventhub product design, such issue should not happen. But it happens in a couple of minutes running, it happens in my environment, and also in Azure China support environment last week.

In my options: 1) Obviously, for eventhub consumer client, no matter how many threads share one same eventhub consumer client object, to eventhub server all of them share one same receiver id, i.e., the id of eventhub consumer client object. Because of such reason fact, eventhub server could not base on the distinct receiver id to determine if it exceeds the maximum number.

2) So, the solution is straight-forward: 2.1) when the client sends to its own identification to server, its id should be recevier id = id of eventhub consumer client object + thread.getId()

2.2) at the eventhub server side, it should tolerant the old receiver id and the new receiver id (the older receiver id is one UUID and then its length is 36, but the new receiver id's length should greater than 37, and then base upon the receiver id length to determine it is in old format or new format) List receiverIds = new ArrayList<>(); //assume such stores the receivers id for one [partition number + consumer group]

    List<String> distinctedReceiverIds = new ArrayList<>();
    for(String one : receiverIds) {     
         if (one.length() < 37)  {
               distinctedReceiverIds.add(one);
         } else
         if (!distinctedReceiverIds.contains(one) )  {
               distinctedReceiverIds.add(one);
         } 
   }

Now, based upon the size of distinctedReceiverIds, such issue would be fixed and tolerate different client version, server version.

================ demo source code ================= package iot;

import com.azure.core.util.IterableStream; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubConsumerClient; import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.PartitionEvent;

import java.time.Duration; import java.time.Instant;

public class IotBrowser1 implements Runnable { private static String connectionString = "Endpoint=sb://xxxxxxxx.chinacloudapi.cn/;SharedAccessKeyName=ems_consumer;SharedAccessKey=xxxxxxxx;EntityPath=xxxxx"; private static String consumerGroup = "xxxxxx";

private static EventHubConsumerClient consumer = makeEventHubConsumerClient();
public static void main(String[] args) {
    for(int i = 0; i < 5; i++) {
        new Thread(new IotBrowser1("client-" + i)).start();
    }
}

public IotBrowser1(String clientName) {
    this.clientName = clientName;
}
private String clientName;

@Override
public void run() {
    clientName += " thread[" + Thread.currentThread().getId() + "]";

    Instant startTime = Instant.now().minus(Duration.ofMinutes(1));
    EventPosition startingPosition = EventPosition.fromEnqueuedTime(startTime);
    Long lastSequenceNumber = null;

    long endMills = Instant.now().plus(Duration.ofDays(1)).toEpochMilli();

    int total = 0;
    long enqueuedMills = 0;
    while (true) {
        IterableStream<PartitionEvent> events = consumer.receiveFromPartition("0",
                100, startingPosition, Duration.ofMillis(1000));
        int count = 0;
        for (PartitionEvent partitionEvent : events) {
            EventData event = partitionEvent.getData();

            lastSequenceNumber = event.getSequenceNumber();
            try {
                String message = new String(event.getBody(), "UTF-8");
                total++;
                count++;

                if(total % 1000 == 0) {
                    System.out.println(Instant.now() + " [" + total + "] " + clientName + ": " + message);
                }
            } catch (Throwable e) {
                e.printStackTrace();
                break;
            }
        }

        if (enqueuedMills > endMills) break;
        if (lastSequenceNumber != null) {
            startingPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);
        }

        if (count < 50) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
            }
        }
    }
}

protected static EventHubConsumerClient makeEventHubConsumerClient() {
    return new EventHubClientBuilder()
            .connectionString(connectionString)
            .consumerGroup(consumerGroup)
            .buildConsumerClient();
}

}

conniey commented 10 months ago

Hey,

I took a look at your sample code and ran it. To give some context, each EventHub*Client represents an AMQP connection (TCP) to the Event Hubs service. Each AMQP connection can contain multiple receive links. Event Hubs service counts each receive link to the service as a "receiver".

What your code is doing, in a tight loop, is creating receive links to the Event Hubs service, and then quickly disposing of them after receiving either 100 events or 1 second. With the 5 threads, you have 5 receive operations happening at the same time any given moment. Adding some time for latency between the service and your network, it is possible to request another receive link while an existing receive operation has not completely shut down, yet.

Note

github-actions[bot] commented 10 months ago

Hi @anlong465. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

anlong465 commented 10 months ago

@conniey, thank very much for your response. and have some additional comments to them as below

Hey,

I took a look at your sample code and ran it. To give some context, each EventHub*Client represents an AMQP connection (TCP) to the Event Hubs service. Each AMQP connection can contain multiple receive links. Event Hubs service counts each receive link to the service as a "receiver".

I understood such design point. But, the id of a "receiver" or "receive link" actually equals to the id of EventHubConsumerClient. (I had verified such point from my client side print-out). That is why when such issue happens, the error message reported by the EventHub server just includes one same receiver ids:

"Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5. List of connected receivers - 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758, 2f3c965a-42dc-4bda-b78e-665b146bb758.

To my option, such error message itself is one bug also: if one id could not represent one "receive link", why uses such id to represent it? Furthuremore, based upon such wrong-id, the backend server would reject client operation.

so, my previous suggestation is pretty simple: use one unique id to represents one "receiver".

Adding some time for latency between the service and your network, it is possible to request another receive link while an existing receive operation has not completely shut down, yet.

I agreed the undertaking logic. But, such 5 threads is just for my demo to easily reproduce such issue; and in my production, only one fixed thread to handle one Kakfa partition for ever. why for one sinle thread, such issue still happens after long time? One is miss-counted as two, I agree such possibility is high. But, one is mis-counted as 6, I think such counting logic need be reviewed!

BTW, I could not fully agreed that such miss-counting is caused by "network latency". 1) some time I need re-start my backend processing once again from scatch (because of my backend processing issue, I had restarted from the scatch many times). 2) when to start from scatch, there is 7-days messages in the queue to be handled (because of our eventhub message expired in 7-days ). based upon our analysis, it would take about 7 hour to consume out those 7-days messages. 3) my backend processing logic is: to fetch out at most 100 messages from the queue in one second; if total message number is less than 50 the current thread would sleep 0.5 second; 4) in other words, during such 7-hour processing period, the thread keep working without any sleep time. 5) but such error never happens during such init stage without any sleep-time, 6) it actuallys happens with some thread sleep-time. 7) with client side thread sleep-time 0.5 second, I think server should have enough time to "completely shut down"!

Note

  • This is running in a while(true) loop.

    • Consider using EventProcessorClient which continuously processes events from all partitions in an Event Hub.
    • For more ore granular control, consider EventHubConsumerAsyncClient.receiveFromPartition(....) which returns a Flux<PartitionEvent>. This Flux continuously receives events from the partition until the subscription is disposed/closed. This would use the same underlying receive link.

Thanks for your suggestion regading with Flux API, we will take some time to research this approach.

github-actions[bot] commented 9 months ago

Hi @anlong465, since you haven’t asked that we /unresolve the issue, we’ll close this out. If you believe further discussion is needed, please add a comment /unresolve to reopen the issue.