TIBCOSoftware / be-contribution

This repository is to add various contributions across different modules within TIBCO BusinessEvents. It will include contributions by TIBCO BusinessEvents team as well external users/customers/fields teams.
BSD 3-Clause "New" or "Revised" License
0 stars 9 forks source link

KafkaStreamsChannel : be-engine throws MissingSourceTopicException on startup. #117

Open tsaxena18 opened 1 year ago

tsaxena18 commented 1 year ago

Version Information

Software Version(s)
BusinessEvents 6.3.0
OS Type? Win
OS Version? Win 10

What is the expected behavior?

The kafka-streams engine from KafkaStreamsChannel project should start successfully.

What is the actual behavior?

The kafka-streams engine throws exception on startup as described below : stream-thread [be_kafka_count_streams-d1299413-3206-4f76-92a0-71b7bd3fd846-StreamThread-1-consumer] Caught an error in the task assignment. Returning an error assignment. org.apache.kafka.streams.errors.MissingSourceTopicException: Missing source topics. at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareRepartitionTopics(StreamsPartitionAssignor.java:522) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:388) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:698) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:736) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:112) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:640) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:603) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1270) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1245) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1323) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1254) [kafka-clients.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227) [kafka-clients.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1007) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:955) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) [kafka-streams-1.0.0.jar:?]

Please provide a unit test that demonstrates the bug.

  1. Create the Kafka Streams channel jar from repo. The jar is automatically added in BE_HOME/lib/ext/tpcl.
  2. Restart BE Studio and create channel of type Kafka Streams in BE project. Set all the fields in channel.
  3. Configure the project acc, and build the EAR file.
  4. Start kafka-streams engine using CDD and EAR, which throws exception as described above.

Please provide log files.

BE630v161.zip KafkaStreamsChannel.zip BE630-Kafka3.5.0.log

tsaxena18 commented 1 year ago

As discussed today, the issue seem to be old as it is reproducible on previous release of BE. It happens only when kafka-streams is started before kafka-producer. It is however cannot be seen with KafkaChannel example, if we start consumer before producer.