We are using spring stream framework to connect with kafka. In our Non Functional Testing, we obversed that if there is mismatch in partition count, then it throws an exception and keep trying to connect.
{"mdc":{},"timestamp":"2023-01-02 09:28:58.891","level":"ERROR","logger":"org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner","message":"Cannot initialize Binder","exception":"
java.lang.IllegalStateException: The number of expected partitions was: 10, but 1 has been found instead
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$6(KafkaTopicProvisioner.java:579)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:530)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:399)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:163)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:236)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:92)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:323)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:288)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:297)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:301)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:142)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
at com.amdocs.msnext.boot.Boot.runWithProperties(Boot.java:69)
at com.amdocs.msnext.boot.Boot.main(Boot.java:47)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at com.amdocs.msnext.launcher.ModuleLauncher.main(ModuleLauncher.java:167)
"}
However, after sometime, it gives OOM exemption.
[7057.474s][error ][jvmti ] Posting Resource Exhausted event: unable to create native thread: possibly out of memory or process/resource limits reached
{"mdc":{"traceId":"861c21b360879796","spanId":"e74d6f2c16733128","sampled":"true","parentId":"0084189b4762b36d"},"timestamp":"2022-12-29 15:44:01.906","level":"ERROR","logger":"org.springframework.cloud.stream.binding.BindingService","message":"Failed to create producer binding; retrying in 30 seconds","exception":"
org.springframework.cloud.stream.binder.BinderException: Exception thrown while building outbound endpoint
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:251)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:92)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:346)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:527)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:143)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:260)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:161)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:86)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:233)
... 11 common frames omitted
Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
at java.base/java.lang.Thread.start0(Native Method)
at java.base/java.lang.Thread.start(Thread.java:798)
at org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:580)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:520)
... 17 common frames omitted
"}
[7059.991s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
One more obversation is it keeps spawning more threads for "micrometer-kafka-metrics". It should not keep creating such thread which is causing OOM issue
"micrometer-kafka-metrics" #102 daemon prio=5 os_prio=0 cpu=1.75ms elapsed=257.45s tid=0x00007faaafc44000 nid=0xa6 waiting on condition [0x00007fa97b5f4000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.16/Native Method)
parking to wait for <0x00000000da71d4b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.16/LockSupport.java:234)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.16/AbstractQueuedSynchronizer.java:2123)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.16/ScheduledThreadPoolExecutor.java:1182)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.16/ScheduledThreadPoolExecutor.java:899)
at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.16/ThreadPoolExecutor.java:1054)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.16/ThreadPoolExecutor.java:1114)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.16/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@11.0.16/Thread.java:829)
We are using spring stream framework to connect with kafka. In our Non Functional Testing, we obversed that if there is mismatch in partition count, then it throws an exception and keep trying to connect.
{"mdc":{},"timestamp":"2023-01-02 09:28:58.891","level":"ERROR","logger":"org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner","message":"Cannot initialize Binder","exception":" java.lang.IllegalStateException: The number of expected partitions was: 10, but 1 has been found instead at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$6(KafkaTopicProvisioner.java:579) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:530) at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:399) at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:163) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:236) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:92) at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:323) at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:288) at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:297) at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:301) at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:142) at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) at com.amdocs.msnext.boot.Boot.runWithProperties(Boot.java:69) at com.amdocs.msnext.boot.Boot.main(Boot.java:47) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) at com.amdocs.msnext.launcher.ModuleLauncher.main(ModuleLauncher.java:167)
"}
However, after sometime, it gives OOM exemption.
[7057.474s][error ][jvmti ] Posting Resource Exhausted event: unable to create native thread: possibly out of memory or process/resource limits reached {"mdc":{"traceId":"861c21b360879796","spanId":"e74d6f2c16733128","sampled":"true","parentId":"0084189b4762b36d"},"timestamp":"2022-12-29 15:44:01.906","level":"ERROR","logger":"org.springframework.cloud.stream.binding.BindingService","message":"Failed to create producer binding; retrying in 30 seconds","exception":" org.springframework.cloud.stream.binder.BinderException: Exception thrown while building outbound endpoint at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:251) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:92) at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152) at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:346) at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:527) at org.apache.kafka.clients.admin.Admin.create(Admin.java:143) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:260) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:161) at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:86) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:233) ... 11 common frames omitted Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached at java.base/java.lang.Thread.start0(Native Method) at java.base/java.lang.Thread.start(Thread.java:798) at org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:580)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:520)
... 17 common frames omitted
"} [7059.991s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
One more obversation is it keeps spawning more threads for "micrometer-kafka-metrics". It should not keep creating such thread which is causing OOM issue
"micrometer-kafka-metrics" #102 daemon prio=5 os_prio=0 cpu=1.75ms elapsed=257.45s tid=0x00007faaafc44000 nid=0xa6 waiting on condition [0x00007fa97b5f4000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.16/Native Method)