tansu-io / tansu

An Apache Kafka compatible broker with PostgreSQL and S3 storage engines written in async 🚀 Rust 🦀
https://shortishly.com/blog/tansu-postgres/
GNU Affero General Public License v3.0
147 stars 4 forks source link

Issues using rdkafka with it #62

Open jeromegn opened 3 weeks ago

jeromegn commented 3 weeks ago

Excerpt from the client logs:

%5|1727900487.827|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [0]: preferred replica (722799104) is unknown: refreshing metadata
%5|1727900487.827|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: preferred replica (32375808) is unknown: refreshing metadata
%4|1727900487.827|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: Fetch response has both preferred read replica and non-zero message set size: 480: skipping messages
%3|1727900487.827|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: Protocol parse failure for Fetch v11 at 664/1201 (rd_kafka_fetch_reply_handle_partition:419) (incorrect broker.version.fallback?)
%3|1727900487.827|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [909130039]: invalid AbortedTxnCnt 2037409085
%5|1727900488.320|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [0]: preferred replica (722799104) lease changing too quickly (0s < 60s): possibly due to unavailable replica or stale cluster state: backing off next fetch
%5|1727900488.320|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: preferred replica (32375808) lease changing too quickly (0s < 60s): possibly due to unavailable replica or stale cluster state: backing off next fetch
%4|1727900488.320|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: Fetch response has both preferred read replica and non-zero message set size: 480: skipping messages
%3|1727900488.320|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: Protocol parse failure for Fetch v11 at 664/1201 (rd_kafka_fetch_reply_handle_partition:419) (incorrect broker.version.fallback?)
%3|1727900488.320|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [909130039]: invalid AbortedTxnCnt 2037409085
%4|1727900488.825|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: Fetch response has both preferred read replica and non-zero message set size: 480: skipping messages
%3|1727900488.825|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: Protocol parse failure for Fetch v11 at 664/1201 (rd_kafka_fetch_reply_handle_partition:419) (incorrect broker.version.fallback?)
%3|1727900488.825|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [909130039]: invalid AbortedTxnCnt 2037409085
%4|1727900489.330|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: Fetch response has both preferred read replica and non-zero message set size: 480: skipping messages
%3|1727900489.330|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: Protocol parse failure for Fetch v11 at 664/1201 (rd_kafka_fetch_reply_handle_partition:419) (incorrect broker.version.fallback?)
%3|1727900489.330|PROTOERR|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [909130039]: invalid AbortedTxnCnt 2037409085
%4|1727900489.837|FETCH|rdkafka#consumer-1| [thrd:<node-addr>:9092/123]: <node-addr>:9092/123: test [1]: Fetch response has both preferred read replica and non-zero message set size: 480: skipping messages

I am assuming it's something in here: https://github.com/tontinton/toshokan/blob/66c17da544faad61d2e1ff11cb6b1b2a519036a2/src/commands/sources/kafka_source.rs

(Sorry, I don't understand nearly enough about Kafka to know what's wrong here)

I've setup the test topic as shown in the README of tansu. I'm using Tigris.

These errors started happening in a loop in tansu:

2024-10-02T20:04:35.381599Z ERROR ThreadId(02) tansu_storage::s3: 458: error=Precondition { path: "clusters/app-logs/topics/test/partitions/0000000002/watermark.json", source: Client { status: 412, body: Some("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>PreconditionFailed</Code><Message>At least one of the pre-conditions you specified did not hold</Message><Resource>/tansu-app-logs/clusters/app-logs/topics/test/partitions/0000000002/watermark.json</Resource><RequestId>1727899475334820290</RequestId><Key>clusters/app-logs/topics/test/partitions/0000000002/watermark.json</Key><BucketName>tansu-app-logs</BucketName></Error>") } } location=Path { raw: "clusters/app-logs/topics/test/partitions/0000000002/watermark.json" }

Then I restarted it and the errors were gone from the server, but my client still errors.

shortishly commented 3 weeks ago

Can you share some instructions that I can follow to setup your environment?