Open wstest123 opened 1 month ago
@anuchandy @conniey @lmolkova
Thank you for your feedback. Tagging and routing to the team member best able to assist.
Unfortunately, there is not enough information to understand what the problem is without a repro. This version of the library is ~3 years old, since then we have made many reliability fixes.
Hi @wstest123. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.
2024-05-09-21_03_48.log @conniey attached the full logs of this issue, please check it. And we will check if this will reproduce after updating to the latest version.
@conniey Thanks for your help! If the attached logs helpful to check this issue? Thanks!
@conniey May I know if any lucky about this issue? I have attached the full log.
2024-05-09-21_03_48.log @conniey attached the full logs of this issue, please check it. And we will check if this will reproduce after updating to the latest version.
Hi @wstest123, Were able to test if the issue persisted after updating to the latest package version?
Describe the bug EventHub Consumer stops consuming messages until the consumer restated.
Exception or Stack Trace 2024-05-10 02:34:35.124 WARN 25378 --- [ parallel-2] c.a.m.e.PartitionBasedLoadBalancer : Load balancing for event processor failed - Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) 2024-05-10 02:34:35.125 INFO 25378 --- [ parallel-2] c.e.l.IotOperationProcessService : Error occurred in partition processor for partition NONE, {}
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'filter' (and no fallback has been configured) at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.24.jar!/:3.4.24] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.24.jar!/:3.4.24] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
To Reproduce Steps to reproduce the behavior:
Code Snippet
import com.alibaba.fastjson.JSON; import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import com.example.lgdcmaincontroller.biz.Biz; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.logging.Level; import java.util.logging.Logger;
@Component @Slf4j @Order(1) public class IotOperationProcessService implements ApplicationRunner {
// Logger.getLogger("com.azure.messaging.eventhubs.PartitionBasedLoadBalancer").setLevel(Level.OFF); // 禁用PartitionBasedLoadBalancer日志 // initEventBus(); initEventHub();
// if (eventData.getSequenceNumber() % 10 == 0) { // eventContext.updateCheckpoint(); // } eventContext.updateCheckpoint();
// senderClient.sendMessage(new ServiceBusMessage(msg)); // create a batch EventData eventData = new EventData(msg); EventDataBatch eventDataBatch = hubProducerClient.createBatch(); if (eventDataBatch.tryAdd(eventData)) { hubProducerClient.send(eventDataBatch); log.error("消息发送成功"); } else { log.error("消息生成失败"); } } catch (Exception e) { log.error("消息发送失败:{}", e.getMessage());
}
Expected behavior Hope to know the reason that consumer stops consuming messages and how to fix it.
Screenshots If applicable, add screenshots to help explain your problem.
Setup (please complete the following information): spring-boot:2.4.1 azure-core:1.47.0 azure-messaging-eventhubs:5.10.0 reactor-core:3.4.24
azure-messaging-eventhubs-checkpointstore-blob:1.16.1
If you suspect a dependency version mismatch (e.g. you see
NoClassDefFoundError
,NoSuchMethodError
or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:mvn dependency:tree -Dverbose
)Additional context Add any other context about the problem here.
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report