schacko-samples / kafka-summit-2021-scst-kstream

7 stars 5 forks source link

StreamBridge error - Dispatcher has no subscriber. #3

Open michaelmok2021 opened 2 years ago

michaelmok2021 commented 2 years ago

I am also getting this when I tried to send messagee using StreamBridge.

my pom.xml is

        <spring-boot.version>2.4.6</spring-boot.version>
        <spring-cloud-gcp.version>2.0.0</spring-cloud-gcp.version>
        <spring-cloud.version>2020.0.3</spring-cloud.version>

I keep see this in the log after I start springboot.

07-10-2021 08:45:23.863 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.handle - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Attempt to heartbeat failed since group is rebalancing - tx.userId= tx.siteName=
07-10-2021 08:45:23.863 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] (Re-)joining group - tx.userId= tx.siteName=
07-10-2021 08:45:23.917 [testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.handle - [Consumer clientId=testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer, groupId=testmm-group] Successfully joined group with generation Generation{generationId=20, memberId='testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer-d664abf6-d2e5-4872-80a5-91c16abd7239', protocol='stream'} - tx.userId= tx.siteName=
07-10-2021 08:45:23.919 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.handle - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Successfully joined group with generation Generation{generationId=20, memberId='testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer-3e537db0-a0e8-4038-a127-ed5d7690e857', protocol='stream'} - tx.userId= tx.siteName=
07-10-2021 08:45:23.971 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.s.p.i.a.HighAvailabilityTaskAssignor.assign - Decided on assignment: {d47b1086-029f-4f68-b215-97c5fc0dbcaf=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1], 329cb0e2-577e-499f-ba46-7a9a6bfba9e8=[activeTasks: ([]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 0]} with no followup probing rebalance. - tx.userId= tx.siteName=
07-10-2021 08:45:23.971 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor.assignTasksToClients - stream-thread [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer] Assigned tasks to clients as 
329cb0e2-577e-499f-ba46-7a9a6bfba9e8=[activeTasks: ([]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 0]
d47b1086-029f-4f68-b215-97c5fc0dbcaf=[activeTasks: ([0_0]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevOwnedPartitionsByConsumerId: ([]) changelogOffsetTotalsByTask: ([]) taskLagTotals: ([]) capacity: 1 assigned: 1]. - tx.userId= tx.siteName=
07-10-2021 08:45:23.971 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor.computeNewAssignment - stream-thread [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer] Finished stable assignment of tasks, no followup rebalances required. - tx.userId= tx.siteName=
07-10-2021 08:45:23.971 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] WARN  o.a.k.c.c.i.ConsumerCoordinator.performAssignment - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] The following subscribed topics are not assigned to any members: [tanalyst-invoice-received-dev]  - tx.userId= tx.siteName=
07-10-2021 08:45:23.972 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.performAssignment - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Finished assignment for group at generation 20: {testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer-d664abf6-d2e5-4872-80a5-91c16abd7239=Assignment(partitions=[rerun-completed-dev-0], userDataSize=48), testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer-3e537db0-a0e8-4038-a127-ed5d7690e857=Assignment(partitions=[], userDataSize=40)} - tx.userId= tx.siteName=
07-10-2021 08:45:24.026 [testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.handle - [Consumer clientId=testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer, groupId=testmm-group] Successfully synced group in generation Generation{generationId=20, memberId='testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer-d664abf6-d2e5-4872-80a5-91c16abd7239', protocol='stream'} - tx.userId= tx.siteName=
07-10-2021 08:45:24.027 [testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1] INFO  o.a.k.c.c.i.SubscriptionState.checkAssignmentMatchedSubscription - [Consumer clientId=testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer, groupId=testmm-group] Assigned partition rerun-completed-dev-0 for non-subscribed topic; subscription is [tanalyst-invoice-received-dev] - tx.userId= tx.siteName=
07-10-2021 08:45:24.027 [testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1] WARN  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete - [Consumer clientId=testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer, groupId=testmm-group] We received an assignment [rerun-completed-dev-0] that doesn't match our current subscription Subscribe(tanalyst-invoice-received-dev); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription - tx.userId= tx.siteName=
07-10-2021 08:45:24.027 [testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.sendJoinGroupRequest - [Consumer clientId=testmm-group-d47b1086-029f-4f68-b215-97c5fc0dbcaf-StreamThread-1-consumer, groupId=testmm-group] (Re-)joining group - tx.userId= tx.siteName=
07-10-2021 08:45:24.028 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator.handle - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Successfully synced group in generation Generation{generationId=20, memberId='testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer-3e537db0-a0e8-4038-a127-ed5d7690e857', protocol='stream'} - tx.userId= tx.siteName=
07-10-2021 08:45:24.028 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Updating assignment with
    Assigned partitions:                       []
    Current owned partitions:                  []
    Added partitions (assigned - owned):       []
    Revoked partitions (owned - assigned):     []
 - tx.userId= tx.siteName=
07-10-2021 08:45:24.029 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator.invokeOnAssignment - [Consumer clientId=testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer, groupId=testmm-group] Notifying assignor about the new Assignment(partitions=[], userDataSize=40) - tx.userId= tx.siteName=
07-10-2021 08:45:24.029 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor.maybeScheduleFollowupRebalance - stream-thread [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule. - tx.userId= tx.siteName=
07-10-2021 08:45:24.029 [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] INFO  o.a.k.s.p.internals.TaskManager.handleAssignment - stream-thread [testmm-group-329cb0e2-577e-499f-ba46-7a9a6bfba9e8-StreamThread-1] Handle new assignment with:
    New active tasks: []
    New standby tasks: []
    Existing active tasks: []

Error


Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={eventTime=Thu Oct 07 08:45:07 AWST 2021, siteName=testmm, eventType=LOGIN_SUCCESS, email=andrew@fastsoft.com.au}, headers={kafka_messageKey=testmm, id=03d81b96-b4c9-59c1-f7c5-8dc28c348d02, timestamp=1633567507857}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:215)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136)
    at com.fastlane.billview.messaging.KafkaMessageProducer.sentRequest(KafkaMessageProducer.java:46)
    at com.fastlane.billview.messaging.LoginEventListener.processLoginEventMessage(LoginEventListener.java:51)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    ... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 24 common frames omitted
sobychacko commented 2 years ago

@michaelmok2021 Looks like you have a custom configuration. Did you check out the code and then made changes? How can I reproduce this error?

michaelmok2021 commented 2 years ago

@sobychacko , I did not make any changes to the spring cloud stream source code. Maybe it is to do with spring cloud stream and spring cloud gcp dependencies along side each other? I will try to create a sample project to reproduce the error.