network-analytics / cosmos-candyfloss

Cosmos Candyfloss: Realtime JSON transformation
Apache License 2.0
5 stars 4 forks source link

Rebalancing causes shutdown #3

Open sgaragan opened 1 month ago

sgaragan commented 1 month ago

We hit an issue where we sometimes see the following message although Candyfloss does not stop logging

json-streams: 2024-09-26 12:30:23,643 [main] INFO com.swisscom.daisy.cosmos.candyfloss.CandyflossKStreamsApplication - Cosmos Candyfloss gracefully shutdown json-streams: 2024-09-26 12:30:23,643 [main] INFO com.swisscom.daisy.cosmos.candyfloss.CandyflossKStreamsApplication - Cosmos Candyfloss gracefully shutdown

Looking at the code, it seems that the state change listener looks for a change from RUNNING to !RUNNING which causes the latch to decrement and allows the runKafkaStreams() method call to return. However, if rebalancing occurs for whatever reason, a state which can return to a RUNNING state, then this will shutdown the processing, which seems to happen when using a larger number of threads. For some reason, Candyfloss keeps running though and it looks like it is still processing messages when in reality, the lag continues to grow

The code in question is in CandyflossStreamsApplication

  static void runKafkaStreams(final KafkaStreams streams) {
    final CountDownLatch latch = new CountDownLatch(1);
    streams.setStateListener(
        (newState, oldState) -> {
          if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
            latch.countDown();
          }
        });

    streams.start();

    try {
      latch.await();
      logger.info("Cosmos Candyfloss gracefully shutdown");
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

The code should allow for a REBALANCING state but a change to PENDING_SHUTDOWN, ERROR or NOT_RUNNING should result in Candyfloss shutting down

KafkaStreams state diagram

ahassany commented 1 month ago

I think you're right in this one, and could explain why we observed some weird behavior before. I would probably go for even slightly different approach, and not handling the transitions explicitly, something as defined by https://kafka.apache.org/38/documentation/streams/tutorial .

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
ahassany commented 1 month ago

Let me know if the new PR works better, I don't have the capability to fully test it at the moment