jdbc-observations / datasource-micrometer

Micrometer Observation API instrumentation and Spring Boot 3 Auto-Configuration for JDBC APIs
Apache License 2.0
50 stars 8 forks source link

datasource-micrometer-spring-boot can not be used with Kafka streams and Spring Boot 3.1.x #28

Closed kaki877 closed 7 months ago

kaki877 commented 7 months ago

By adding

net.ttddyy.observationdatasource-micrometer-spring-boot1.0.2

to my pom application gets always error with:

14:12:22.380 [scheduling-1] [,] ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@7c6cb3e1], failedMessage=GenericMessage [payload=net.ttddyy.dsproxy.proxy.GlobalConnectionIdManager@286f419a, headers={id=029ae2a8-3943-c4f8-67c2-70a47dfe8c94, timestamp=1701263542379}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1594) at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375) at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:217) at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:199) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375) at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196) at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460) at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412) at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348) at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:212) at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:160) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:66) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1062) at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:785) at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754) at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:564) at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:532) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ... 50 more

Kafka ist used with latest version (for spring 3.x) 7.5.1.

If I remove dependency from pom.xml or disable feature with jdbc:.datasource-proxy:.enabled=false application runs again.

So I guess, here is something in trouble by using Kafka (streams).

I'm using:

3.1.62022.0.4 datasource-micrometer-spring-boot1.0.2
 Any idea
kaki877 commented 7 months ago

net.ttddyy.dsproxy.proxy.GlobalConnectionIdManager is set for

Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:212) at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:160) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:66)

ttddyy commented 7 months ago

datasource-micrometer doesn't do anything with Kafka. Based on the stack trace, your message contains GlobalConnectionIdManager and fails to serialize it. I'm not sure why you are sending an object that contains GlobalConnectionIdManager which probably is coming from a proxied JDBC object. In general, you do not send those JDBC objects over the wire. I would suggest that you take a look at what message you are sending by Kafka.

Closing as it is not related to the datasource-micrometer-spring-boot project.

kaki877 commented 7 months ago

Is a problem in case with spring cloud kafka stream stack, If any producer or consumer with spring Kafka stream stack is enabled, datasource proxy doesn’t work !!! Problem occurs in spring 2 stack too. So we can NOT prefer datasource-proxy with Kafka anywhere. We (a lot of developer) guess, that inside of your stack is a wrong usage of a message interface of spring that will be used in Kafka spring stream stack too.

The problem occurs by an internal spring usage of Message<?> inside of spring, if kafka stack is starting up. At this time no Kafka event action is in our code processing. It happens on application startup.

P6SPY works fine. So we would be happy if you would investigate that further, because no p6spy will be supported for spring 3 now. If you problem will be not fixed, we have to lookup for an other jdbc-plugin, because 80% of any java spring projects are affected with Kafka actually.

Best regards

Tilo

Am 30.11.2023 um 01:32 schrieb Tadaya Tsuyukubo @.***>:

datasource-micrometer doesn't do anything with Kafka. Based on the stack trace, your message contains GlobalConnectionIdManager and fails to serialize it. I'm not sure why you are sending an object that contains GlobalConnectionIdManager which probably is coming from a proxied JDBC object. In general, you do not send those JDBC objects over the wire. I would suggest that you take a look at what message you are sending by Kafka.

Closing as it is not related to the datasource-micrometer-spring-boot project.

— Reply to this email directly, view it on GitHubhttps://github.com/jdbc-observations/datasource-micrometer/issues/28#issuecomment-1832916243, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AZVQMKJC75RA74ZQDT3IQBLYG7HZBAVCNFSM6AAAAAA77OQ7WSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMZSHEYTMMRUGM. You are receiving this because you authored the thread.Message ID: @.***>

ttddyy commented 7 months ago

Hi @kaki877 As stated in my previous comment, datasource-micrometer does nothing to do with Kafka. It doesn't implement the Message interface nor doesn't have any reference to it. Also, JDBC objects are local resources and sending them over network doesn't make sense. I suggest checking what objects you are sending over the stream, and removes any JDBC proxied objects (assuming GlobalConnectionIdManager comes from them).

If you think it is an issue with the spring cloud Kafka stream, I also suggest opening a ticket to the project with minimal reproducer.