AxonFramework / extension-kafka

Axon Framework extension for Kafka integration to publish and handle Event messages.
https://axoniq.io/
Apache License 2.0
67 stars 28 forks source link

File Leak #31

Closed mishkaechoes closed 3 years ago

mishkaechoes commented 4 years ago

There are two or three issues at play at the moment.

  1. After org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files the KafkaConsumer thread dies instead of crashing the entire service. This creates a zombie process which requires external healthchecks to monitor.

  2. After running multiple different tests and a file leak detector we are 100% certain the root cause is the following:

    #706 selector by thread:AsyncFetcher-0 on Tue Jan 07 15:27:59 EST 2020
    at java.nio.channels.spi.AbstractSelector.<init>(AbstractSelector.java:86)
    at sun.nio.ch.SelectorImpl.<init>(SelectorImpl.java:54)
    at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:83)
    at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
    at java.nio.channels.Selector.open(Selector.java:227)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:214)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:227)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:231)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:445)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:422)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:270)
    at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createKafkaProducer(DefaultProducerFactory.java:184)
    at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createTransactionalProducer(DefaultProducerFactory.java:178)
    at org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory.createProducer(DefaultProducerFactory.java:114)
    at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:128)
    at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:77)
    at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:108)
    at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:89)
    at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:165)
    at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57)
    at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:65)
    at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55)
    at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$2(AbstractEventProcessor.java:173)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:86)
    at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159)
    at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:143)
    at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:111)
    at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35)
    at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:95)
    at org.axonframework.eventhandling.AbstractEventBus.lambda$prepareCommit$15(AbstractEventBus.java:229)
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
    at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
    at org.axonframework.eventhandling.AbstractEventBus.prepareCommit(AbstractEventBus.java:229)
    at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
    at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:218)
    at org.axonframework.eventhandling.AbstractEventBus.lambda$null$8(AbstractEventBus.java:152)
    at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155)
    at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111)
    at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159)
    at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:143)
    at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:111)
    at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35)
    at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:95)
    at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$null$2(SubscribableKafkaMessageSource.java:177)
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891)
    at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404)
    at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$start$3(SubscribableKafkaMessageSource.java:177)
    at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:89)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Creating a new KafkaProducer on every event seems incredibly wasteful. Either a cached queue of 10 or simply 1 would be desirable. Finally if the decision to use many (not advised) then the handles need to be closed.

In our cases we had set the max files (lsof) to 16k and repeatedly watched it exceed the threshold over a period of several minutes.

smcvb commented 4 years ago

Thanks for reporting this issue with us @mishkaechoes. Would you mind elaborating which version of the extension you are using?

Just within 2019 we released RC3 which brings a lot of adjustments towards the entirety of the Kafka Extension. If you are on RC2, I'd firstly recommend moving towards RC3 and verify if the same issue occurs again.

If the problem prevails, then it sounds like we have found a worthwhile bug to resolve prior to an actual release of the extension.

mishkaechoes commented 4 years ago

We are on c2346904deb6198d0f039e7c9d9c2194805db5b9 off master. This is a few commits behind but after reviewing those commits nothing appears to reflect the issue we are seeing.

mishkaechoes commented 4 years ago

Pulled latest and it appears to have stabilized. We're going to retest in a new environment with more load.

mishkaechoes commented 4 years ago

After updating to RC3 and pushing to an isolated environment in PCF we are observing the following:


   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT 2020-01-08 18:51:17.034 ERROR 17 --- [ing.producer]-0] o.a.eventhandling.LoggingErrorHandler    : EventListener [KafkaEventPublisher] failed to handle event [77e81f5b-e4bb-487b-9aa0-724e68f1496e] (com.aramark.ess.common.SyncModelCreated). Continuing processing with next listener
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT java.lang.IllegalStateException: Cannot perform operation after producer has been closed
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:863) ~[kafka-clients-2.3.0.jar!/:na]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:641) ~[kafka-clients-2.3.0.jar!/:na]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.tryBeginTxn(KafkaPublisher.java:157) ~[axon-kafka-4.0-RC3-ARAMARK.jar!/:4.0-RC3-ARAMARK]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher.send(KafkaPublisher.java:132) ~[axon-kafka-4.0-RC3-ARAMARK.jar!/:4.0-RC3-ARAMARK]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.handle(KafkaEventPublisher.java:77) ~[axon-kafka-4.0-RC3-ARAMARK.jar!/:4.0-RC3-ARAMARK]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:108) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:89) [axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:165) [axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:65) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:162) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$2(AbstractEventProcessor.java:173) [axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:86) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159) [axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:412) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:275) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1071) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1183) ~[axon-messaging-4.2.jar!/:4.2]
   2020-01-08T13:51:17.03-0500 [APP/PROC/WEB/0] OUT     at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_202]```

Events are not being published to Kafka.
We've reset the environment and tried again multiple times each time the result appears to be the same.
mishkaechoes commented 4 years ago

We've also observed that in some instances the above stack is not visible but Axon still fails to publish with no errors or tracestack

mishkaechoes commented 4 years ago

When the service restarts it appears to restart with a kafka.producer bean not active

smcvb commented 4 years ago

Hey @mishkaechoes, we have had some time to digest and investigate on our part.

To summarize your problem statement, you say that (A) the DefaultProducerFactory in the extension creates a new KafkaProducer for every event coming through. Added, you would assume (B) a single instance or a cached pool of ten instances would be sufficient.

Having said that, the DefaultProducerFactory will either (1) create a ShareableProducer or (2) a PoolableProducer. Option two will only occur if you have defined the publishing side's "confirmation mode" to be TRANSACTIONAL, which from the stack trace I can deduce you have.

This then brings us to the private DefaultProducerFactory#createTransactionalProducer method on line 170, as this point where we create a KafkaProducer. Per reference, let me share said method:

private Producer<K, V> createTransactionalProducer() {
    Producer<K, V> producer = this.cache.poll();
    if (producer != null) {
        return producer;
    }
    Map<String, Object> configs = new HashMap<>(this.configuration);
    configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
    producer = new PoolableProducer<>(createKafkaProducer(configs), cache, closeTimeout);
    producer.initTransactions();
    return producer;
}

On the first line you can see we will retrieve a Producer from the cache, which if it isn't null will send it back right away. This cache is defined as a ArrayBlockingQueue, using the producerCacheSize (which defaults to 10) to define how much Producer will be maintained. Subsequently, a PoolableProducer will be returned to this cache by the KafkaPublisher on line 144, where it tries to close the given Producer.

Thus far, I see that from my short summarization of your issue, point (B) should be covered. Added, I feel this breaks assumption (A), that a new Producer would be created for every event.


Concluding, this brings me to a point that I am still pretty uncertain of what's failing in your system, @mishkaechoes... The KafkaPublisher#tryClose(Producer) method does log a debug message if closing fails. Might be worth a try to unable debug logging for the KafkaPublisher only and see if the line Unable to close producer comes by.

Additionally, maybe @lbilger can say something smart about what's happening? @lbilger, I am asking you as you've most recently made adjustments in the DefaultProducerFactory and are very, very likely using it in production somewhere. You should by no means feel pressured to help out, but if you do happen to have any insights for me and @mishkaechoes, that would be much appreciated.

mishkaechoes commented 4 years ago

This issue can be closed it was resolved with latest. The problem only manifested with c234690 . That being said we have been stuck with different issues #32 #33 to retest latest with load. We do observe that it no longer creates a new KafkaPublisher per event as previously observed.

mishkaechoes commented 4 years ago

After retesting this is still an issue. A new producer is created and close is not invoked.

mishkaechoes commented 4 years ago

Behavior is a result of using TRANSACTIONAL

smcvb commented 4 years ago

Concluding, this brings me to a point that I am still pretty uncertain of what's failing in your system, @mishkaechoes... The KafkaPublisher#tryClose(Producer) method does log a debug message if closing fails. Might be worth a try to unable debug logging for the KafkaPublisher only and see if the line Unable to close producer comes by.

Have you retested with debug logging turned on for all the publishing components from the kafka extension? If not, doing so and sharing it here would be highly beneficial to figure out the problem you are experiencing.

lbilger commented 4 years ago

@smcvb, @mishkaechoes I'm afraid I can't explain this problem either. We are using the extension in production but not using transactional mode, so we are not seeing this problem. I looked at the code again and I would expect it to do exactly what you described above, @smcvb. We try to get a producer from the pool and create a new one only if none is available. When the producer is closed, it is returned to the pool unless the pool is full, in which case the producer is properly closed. The only way I can think of to create thousands of producers would be to send lots of messages simultaneously so that new producers get created because none are available in the pool. Could that be the case here?

smcvb commented 3 years ago

Closing this issue due to inactivity and lack of reoccurrence at other users.

If somebody does encounter the same problem again, please do reply in this thread. When doing so, it would be beneficial if you can provide: