redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.21k stars 564 forks source link

Kafka Streams GlobalStores example fails with 3-node RP cluster #2567

Open NyaliaLui opened 2 years ago

NyaliaLui commented 2 years ago

In the Kafka Streams examples, there is this example that shows how to join data between KafkaStreams and a GlobalStore.

The example successfully runs with one redpanda node which matches the example use-case. However, the example fails with 3 redpanda nodes.

See the following to reproduce

Requires: Follow the build steps for kafka-streams here -- you'll need Apache Maven and Java 8+ (I use Java 11)

My redpanda .yaml files: found here

Run redpanda brokers: <path to build root>/release/clang/bin/redpanda --redpanda-cfg ~/local-cluster/node0/redpanda.yaml --smp=1 <path to build root>/release/clang/bin/redpanda --redpanda-cfg ~/local-cluster/node1/redpanda.yaml --smp=1 <path to build root>/release/clang/bin/redpanda --redpanda-cfg ~/local-cluster/node2/redpanda.yaml --smp=1

Create topics:

rpk topic create order --brokers 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 -p 4 -r 1
rpk topic create customer --brokers 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 -p 3 -r 1
rpk topic create product --brokers 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 -p 2 -r 1
rpk topic create enriched-order --brokers 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 -p 4 -r 1

From within the kafka-streams-examples dir Run GlobalStore example (this will block until next step):

java -cp target/kafka-streams-examples-6.2.0-standalone.jar io.confluent.examples.streams.GlobalStoresExample 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 http://0.0.0.0:8081,http://0.0.0.0:8082,http://0.0.0.0:8083

Run their driver which generates load:

java -cp target/kafka-streams-examples-6.2.0-standalone.jar io.confluent.examples.streams.GlobalKTablesAndStoresExampleDriver 0.0.0.0:9092,0.0.0.0:9093,0.0.0.0:9094 http://0.0.0.0:8081,http://0.0.0.0:8082,http://0.0.0.0:8083

Results

The example fails because of a problem serializing the Avro message and then crashes

[2021-10-07 08:55:49,514] ERROR [global-stores-example-client-StreamThread-1] stream-thread [global-stores-example-client-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic enriched-order for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:177)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)

and redpanda node0 reports that the broker is unavailable

WARN  2021-10-07 08:55:49,052 [shard 0] kafka/client - client.cc:148 - broker_error: kafka::client::broker_error ({ node: -1 }, { error_code: broker_not_available [8] })
INFO  2021-10-07 08:55:49,052 [shard 0] kafka/client - broker.cc:41 - connected to broker:-1 - 0.0.0.0:9092
INFO  2021-10-07 08:55:49,053 [shard 0] kafka/client - broker.cc:41 - connected to broker:0 - 0.0.0.0:9092
INFO  2021-10-07 08:55:49,053 [shard 0] kafka/client - broker.cc:41 - connected to broker:1 - 0.0.0.0:9093
INFO  2021-10-07 08:55:49,053 [shard 0] kafka/client - broker.cc:41 - connected to broker:2 - 0.0.0.0:9094
WARN  2021-10-07 08:55:49,452 [shard 0] kafka/client - client.cc:148 - broker_error: kafka::client::broker_error ({ node: -1 }, { error_code: broker_not_available [8] })
INFO  2021-10-07 08:55:49,453 [shard 0] kafka/client - broker.cc:41 - connected to broker:-1 - 0.0.0.0:9092

JIRA Link: CORE-750

NyaliaLui commented 2 years ago

I retried this example since many updates have been made to dev. Now the example sometimes fails with the same exception on the 1st run but succeeds with the expected output on subsequent retries.

emaxerrno commented 2 years ago

Looks like a metadata issue

jcsp commented 1 year ago

@NyaliaLui is this ticket still relevant?

kecci commented 7 months ago

Sometimes I also get this issue:

[DEBUG] wrote Produce v7; broker: 1, bytes_written: 221, write_wait: 121.458µs, time_to_write: 44µs, err: <nil>
[DEBUG] read Produce v7; broker: 1, bytes_read: 103, read_wait: 34.542µs, time_to_read: 47.187458ms, err: <nil>
[DEBUG] retry batches processed; wanted_metadata_update: true, triggering_metadata_update: true, should_backoff: false
[DEBUG] produced; broker: 1, to: topicDummy[6{retrying@-1,1(BROKER_NOT_AVAILABLE: The broker is not available.)}]
[INFO] metadata update triggered; why: produce request had retry batches