Azure / azure-event-hubs-for-kafka

Azure Event Hubs for Apache Kafka Ecosystems
https://docs.microsoft.com/azure/event-hubs/event-hubs-for-kafka-ecosystem-overview
Other
231 stars 213 forks source link

Kafka producer with transaction enabled not able to publish event to event hub #209

Open ravindra-dh opened 2 years ago

ravindra-dh commented 2 years ago

Description

Kafka client without transaction able to publish the events to event hub. But when transaction enabled, it fails with error: an existing connection was forcibly closed by the remote host

How to reproduce

Sample code is provided as part of checklist

Has it worked previously?

First time trying this.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed or where adequate information has not been provided.

Please provide the relevant information for the following items:

If this is a question on basic functionality, please verify the following:

arkadius commented 1 year ago

@ravindra-dh Are you sure that it is related with transactions? Have you tried with transactions disabled? I'm trying to use Kafka transactions with Event Hub and I have other behavior. It fails on KafkaProducer.initTransactions() with such message:

2023-02-01 14:01:33.253 [kafka-producer-network-thread | azure-test] INFO o.apache.kafka.clients.NetworkClient - [Producer clientId=azure-test, transactionalId=azure-testfa6ac8d9-59f4-4dc8-98e4-96480aafee2d] Node 0 disconnected. 2023-02-01 14:01:33.253 [kafka-producer-network-thread | azure-test] INFO o.apache.kafka.clients.NetworkClient - [Producer clientId=azure-test, transactionalId=azure-testfa6ac8d9-59f4-4dc8-98e4-96480aafee2d] Cancelled in-flight FIND_COORDINATOR request with correlation id 245 due to node 0 being disconnected (elapsed time since creation: 49ms, elapsed time since send: 49ms, request timeout: 60000ms)

I would like to know if each tenant can behave differently in this area.

mgvinuesa commented 1 year ago

Hello @arkadius, same here, I have those logs too. And also I detect this error:


org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:162) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) ~[spring-tx-6.0.4.jar:6.0.4]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) ~[spring-tx-6.0.4.jar:6.0.4]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:595) ~[spring-tx-6.0.4.jar:6.0.4]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:382) ~[spring-tx-6.0.4.jar:6.0.4]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.0.4.jar:6.0.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.4.jar:6.0.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:752) ~[spring-aop-6.0.4.jar:6.0.4]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703) ~[spring-aop-6.0.4.jar:6.0.4]
    at com.ferrovial.architecture.sample.kafka.controller.PutMessageIntoTopicController$$SpringCGLIB$$0.sendMultipleAvroMessage(<generated>) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:207) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:152) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1080) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:973) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:731) ~[tomcat-embed-core-10.1.5.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.4.jar:6.0.4]
    at org.springframework.test.web.servlet.TestDispatcherServlet.service(TestDispatcherServlet.java:72) ~[spring-test-6.0.4.jar:6.0.4]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:814) ~[tomcat-embed-core-10.1.5.jar:6.0]
    at org.springframework.mock.web.MockFilterChain$ServletFilterProxy.doFilter(MockFilterChain.java:165) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
    at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
    at org.springframework.test.web.servlet.MockMvc.perform(MockMvc.java:201) ~[spring-test-6.0.4.jar:6.0.4]
    at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.performRequest(MockMvcRequestSenderImpl.java:212) ~[spring-mock-mvc-5.2.1.jar:na]
    at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.sendRequest(MockMvcRequestSenderImpl.java:456) ~[spring-mock-mvc-5.2.1.jar:na]
    at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.post(MockMvcRequestSenderImpl.java:513) ~[spring-mock-mvc-5.2.1.jar:na]
    at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.post(MockMvcRequestSenderImpl.java:84) ~[spring-mock-mvc-5.2.1.jar:na]
    at com.ferrovial.architecture.sample.kafka.controller.it.KafkaProducerTransactionInEventHubTest.whenPutMultipleMessageInATransactionAndOneFail_ThenAnyOneAreConsumed(KafkaProducerTransactionInEventHubTest.java:130) ~[test-classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) ~[junit-platform-commons-1.9.2.jar:1.9.2]
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: org.springframework.kafka.KafkaException: initTransactions() failed
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateTxProducer(DefaultKafkaProducerFactory.java:873) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createTransactionalProducer(DefaultKafkaProducerFactory.java:824) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:737) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:727) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:96) ~[spring-kafka-3.0.2.jar:3.0.2]
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:146) ~[spring-kafka-3.0.2.jar:3.0.2]
    ... 117 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting InitProducerId