AxonFramework / extension-kafka

Axon Framework extension for Kafka integration to publish and handle Event messages.
https://axoniq.io/
Apache License 2.0
67 stars 28 forks source link

Support Streaming Processor Reset Operation #163

Closed leechedan closed 3 years ago

leechedan commented 3 years ago

Basic information

Steps to reproduce

@Component
@Slf4j
@ProcessingGroup("user-processor")
public class UserListener implements IUserEvent {

    @Autowired
    private UserViewService service;

    @Autowired
    private EventProcessingConfiguration epc;
    public void on() {
        Optional<TrackingEventProcessor> ret =
                epc.eventProcessor("user-processor", TrackingEventProcessor.class);

        if (ret.isPresent()) {
            TrackingEventProcessor proc = ret.get();
            proc.shutDown();
            proc.resetTokens();
            proc.start();

        } else {
            throw new ValidationException("Process not found.");
        }
    }
}

call the userListener.on()

Expected behaviour

fetch event functionly, and rebuild the aggregate status

Actual behaviour

`2021-07-17 06:12:24.241 INFO 1372 --- [nio-8090-exec-1] o.a.e.TrackingEventProcessor : Shutdown state set for Processor 'user-processor'. 2021-07-17 06:12:24.241 INFO 1372 --- [nio-8090-exec-1] o.a.e.TrackingEventProcessor : Processor 'user-processor' awaiting termination... 2021-07-17 06:12:24.521 INFO 1372 --- [er-processor]-0] o.a.e.k.e.consumer.FetchEventsTask : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@1050b4f7] 2021-07-17 06:12:24.524 INFO 1372 --- [er-processor]-0] o.a.e.TrackingEventProcessor : Released claim 2021-07-17 06:12:24.524 INFO 1372 --- [er-processor]-0] o.a.e.TrackingEventProcessor : Worker for segment Segment[0/0] stopped. 2021-07-17 06:12:24.529 INFO 1372 --- [nio-8090-exec-1] o.a.e.tokenstore.AbstractTokenEntry : token:null ser:org.axonframework.serialization.json.JacksonSerializer@55b7f0d type:class [B 2021-07-17 06:12:24.539 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/0] for processing 2021-07-17 06:12:24.540 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=user-processor, segment=Segment[0/0]} 2021-07-17 06:12:24.542 INFO 1372 --- [er-processor]-1] o.a.e.TrackingEventProcessor : Fetched token: null for segment: Segment[0/0] 2021-07-17 06:12:24.542 INFO 1372 --- [er-processor]-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 3000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.id = kafka-axon-user connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2021-07-17 06:12:24.549 WARN 1372 --- [er-processor]-1] o.a.k.clients.consumer.ConsumerConfig : The configuration 'some-key.0' was supplied but isn't a known config. 2021-07-17 06:12:24.549 INFO 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2021-07-17 06:12:24.549 INFO 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2021-07-17 06:12:24.552 WARN 1372 --- [er-processor]-1] o.a.kafka.common.utils.AppInfoParser : Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka-axon-user at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_241] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_241] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_241] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_241] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_241] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_241] at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) ~[kafka-clients-2.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) [kafka-clients-2.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) [kafka-clients-2.0.1.jar:na] at org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory.createConsumer(DefaultConsumerFactory.java:67) [axon-kafka-4.0-RC3.jar:4.0-RC3] at org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource.openStream(StreamableKafkaMessageSource.java:122) [axon-kafka-4.0-RC3.jar:4.0-RC3] at org.axonframework.eventhandling.TrackingEventProcessor.doOpenStream(TrackingEventProcessor.java:508) [axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.eventhandling.TrackingEventProcessor.lambda$ensureEventStreamOpened$14(TrackingEventProcessor.java:496) [axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.common.transaction.TransactionManager.fetchInTransaction(TransactionManager.java:70) ~[axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.eventhandling.TrackingEventProcessor.ensureEventStreamOpened(TrackingEventProcessor.java:495) [axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:290) [axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1092) ~[axon-messaging-4.2.2.jar:4.2.2] at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1206) ~[axon-messaging-4.2.2.jar:4.2.2] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]

2021-07-17 06:12:24.583 INFO 1372 --- [ AsyncFetcher-1] org.apache.kafka.clients.Metadata : Cluster ID: DQELf6cCSyiz1x8IhUAixg 2021-07-17 06:12:24.584 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null) 2021-07-17 06:12:24.585 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Revoking previously assigned partitions [] 2021-07-17 06:12:24.585 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] (Re-)joining group 2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Successfully joined group with generation 1 2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-axon-user, groupId=Axon.Streamable.Consumer-b620957d-22c5-4caf-9e0f-d7b15c094841] Setting newly assigned partitions [localevent-2, localevent-1, localevent-0] 2021-07-17 06:12:24.634 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-2] with offset [0] 2021-07-17 06:12:24.635 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-1] with offset [0] 2021-07-17 06:12:24.635 INFO 1372 --- [ AsyncFetcher-1] s.TrackingTokenConsumerRebalanceListener : Seeking topic-partition [localevent-0] with offset [0] 2021-07-17 06:12:25.495 INFO 1372 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask : Fetch events task and used Consumer instance [org.apache.kafka.clients.consumer.KafkaConsumer@1050b4f7] have been closed `

leechedan commented 3 years ago

ccording to the stacks print, at org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory.createConsumer(DefaultConsumerFactory.java:67) [axon-kafka-4.0-RC3.jar:4.0-RC3] position, reset operation will create another consumer and registry to jmx that will conflict to the consumer registred.

smcvb commented 3 years ago

From your description, it sounds like Axon should remove the Consumer once the processor is shut down. It shouldn't be too hard to achieve.

I also notice a serialization issue in your stack trace somewhere. Just out of curiosity, but does this problem occur too when you use the XStreamSerializer?

Concluding, I'd rephrase this issue as "Support Streaming Processor Reset Operation", as it seems it doesn't do that. What do you think, @leechedan?

leechedan commented 3 years ago

From your description, it sounds like Axon should remove the Consumer once the processor is shut down. It shouldn't be too hard to achieve.

I also notice a serialization issue in your stack trace somewhere. Just out of curiosity, but does this problem occur too when you use the XStreamSerializer?

Concluding, I'd rephrase this issue as "Support Streaming Processor Reset Operation", as it seems it doesn't do that. What do you think, @leechedan?

yes, confirmed it work while using the XStreamSerializer, so the problem is reset failed when using the jackson serializer

smcvb commented 3 years ago

Awesome, happy to hear that was the culprit. Honestly, I assumed the PR you did last week would've solved the problem.

Granted, it's not released yet of course. Have you tried doing a reset, adding the fix you did in pull request #158, @leechedan?

leechedan commented 3 years ago

I just cannot reproduce this bug for now, so supposing close this issue.

smcvb commented 3 years ago

No worries there. Just happy to hear it's no longer occurring. If it does pop up again, be sure to reply to this issue!