redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.16k stars 558 forks source link

Consumer repeats messages #6069

Open mfelsche opened 1 year ago

mfelsche commented 1 year ago

Version & Environment

Redpanda version: (use rpk version): v22.1.7 (rev 9ddbb4d)

What went wrong?

In an integration test of ours, testing consuming from redpanda, we get repeated messages after following roughly the following steps:

  1. start the docker image with:

    docker run --rm -p 127.0.0.1:9092:9092 -p 127.0.0.1:9644:9644 vectorized/redpanda:v22.1.7 redpanda start --overprovisioned --smp=1 --memory=1G --reserve-memory=0M --node-id=0 --check=false --kafka-addr=0.0.0.0:9092 --advertise-kafka-addr=127.0.0.1:9092 --default-log-level=debug
  2. create a topic with 1 partition and replication-factor of 1 (no additional configs)
  3. start the consumer with the following config:

    "enable.auto.commit": "true", 
    "client.id": "tremor-pop-os-test::transactional_commit_offset_handling", 
    "auto.offset.reset": "beginning", 
    "bootstrap.servers": "127.0.0.1:9092", 
    "group.id": "group_commit_offset", 
    "enable.auto.offset.store": "false"
  4. wait until the consumer gets its assignment (1 partition).
  5. produce 3 messages.
  6. directly commit the offset of the last message +1 to the group-coordinator
  7. stop the consumer
  8. verify the broker has the correct offsets
  9. start another consumer and commit the offset of the second message, seek the client to that message and receive this message again
  10. stop the consumer
  11. verify the broker has the correct offsets (of the second message)
  12. start another consumer
  13. receive the second and third message
  14. ack the third message
  15. produce 2 more messages
  16. receive those 2 messages

Step 16 is where it goes south and we receive the second and third message again. The same behaviour has been tested against kafka 2.4, where this behaviour cannot be reproduced.

What should have happened instead?

In step 16. above, the consumer should have received the 2 freshly produced messages.

How to reproduce the issue?

This is the failing test: https://github.com/tremor-rs/tremor-runtime/blob/582ca747ddce436e83d1cef07402fe1dabe418eb/src/connectors/tests/kafka/consumer.rs#L1099-L1367

  1. clone the reprository at https://github.com/tremor-rs/tremor-runtime
  2. install rust and docker
  3. execute RUST_LOG=debug cargo test --lib --features=kafka-integration -- connectors::tests::kafka::consumer::transactional_commit_offset_handling --nocapture (It might need to be executed multiple times before the test fails, it is not consistent.)

Additional information

Unfortunately enabling rdkafka debug logs makes the issue disappear, so i have no client logs yet. redpanda is not showing anything useful.

JIRA Link: CORE-1003

mfelsche commented 1 year ago

Here is a debug log from executing the integration test mentioned above from a failing run, repeating 2 messages:

RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received ApiVersionResponse (v3, 184 bytes, CorrId 1, rtt 0.15ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: PROTOERR [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Protocol parse failure for ApiVersion v3(flex) at 3/184 (rd_kafka_handle_ApiVersion:2131) (incorrect broker.version.fallback?)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: PROTOERR [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: ApiArrayCnt -1 out of range
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: APIVERSION [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent ApiVersionRequest (v0, 70 bytes @ 0, CorrId 2)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received ApiVersionResponse (v0, 180 bytes, CorrId 2, rtt 0.09ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:main]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent MetadataRequest (v4, 75 bytes @ 0, CorrId 3)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent FindCoordinatorRequest (v2, 92 bytes @ 0, CorrId 4)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received MetadataResponse (v4, 84 bytes, CorrId 3, rtt 0.10ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received FindCoordinatorResponse (v2, 27 bytes, CorrId 4, rtt 0.13ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CLUSTERID [thrd:main]: 127.0.0.1:9092/0: ClusterId update "" -> "redpanda.da5dab49-1e26-47f8-a818-867637160c9d"
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:main]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:main]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONNECTED [thrd:GroupCoordinator]: GroupCoordinator/0: Connected (#1)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FEATURE [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features +ApiVersion to ApiVersion
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FindCoordinatorRequest (v2, 92 bytes @ 0, CorrId 5)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent ApiVersionRequest (v3, 89 bytes @ 0, CorrId 1)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received ApiVersionResponse (v3, 184 bytes, CorrId 1, rtt 0.05ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: PROTOERR [thrd:GroupCoordinator]: GroupCoordinator/0: Protocol parse failure for ApiVersion v3(flex) at 3/184 (rd_kafka_handle_ApiVersion:2131) (incorrect broker.version.fallback?)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: PROTOERR [thrd:GroupCoordinator]: GroupCoordinator/0: ApiArrayCnt -1 out of range
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: APIVERSION [thrd:GroupCoordinator]: GroupCoordinator/0: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent ApiVersionRequest (v0, 70 bytes @ 0, CorrId 2)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received FindCoordinatorResponse (v2, 27 bytes, CorrId 5, rtt 0.13ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received ApiVersionResponse (v0, 180 bytes, CorrId 2, rtt 0.08ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:main]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v4, 75 bytes @ 0, CorrId 3)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: JOIN [thrd:main]: Group "group_commit_offset": postponing join until up-to-date metadata is available
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent MetadataRequest (v4, 102 bytes @ 0, CorrId 6)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v4, 84 bytes, CorrId 3, rtt 0.08ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received MetadataResponse (v4, 144 bytes, CorrId 6, rtt 0.07ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: REJOIN [thrd:main]: Group "group_commit_offset": subscription updated from metadata change: rejoining group in state wait-metadata
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: REBALANCE [thrd:main]: Group "group_commit_offset" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: REJOIN [thrd:main]: Group "group_commit_offset": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: JOIN [thrd:main]: 127.0.0.1:9092/0: Joining group "group_commit_offset" with 1 subscribed topic(s) and member id ""
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v4, 224 bytes @ 0, CorrId 4)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v4, 113 bytes, CorrId 4, rtt 0.05ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: REJOIN [thrd:main]: Group "group_commit_offset": Rejoining group without an assignment: JoinGroup error: Broker: Group member needs a valid member ID
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: JOIN [thrd:main]: 127.0.0.1:9092/0: Joining group "group_commit_offset" with 1 subscribed topic(s) and member id "tremor-pop-os-test::transactional_commit_offset_handling-22dc4698-cd92-419e-8901-a1b1fd15ce4f"
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v4, 317 bytes @ 0, CorrId 5)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v4, 351 bytes, CorrId 5, rtt 0.04ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v4, 102 bytes @ 0, CorrId 6)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v4, 144 bytes, CorrId 6, rtt 0.04ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]: Group "group_commit_offset" running range assignor for 1 member(s) and 1 eligible subscribed topic(s):
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]:  Member "tremor-pop-os-test::transactional_commit_offset_handling-22dc4698-cd92-419e-8901-a1b1fd15ce4f" (me) with 0 owned partition(s) and 1 subscribed topic(s):
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]:   tremor_test_commit_offset [-1]
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]: Group "group_commit_offset" range assignment for 1 member(s) finished in 0.034ms:
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]:  Member "tremor-pop-os-test::transactional_commit_offset_handling-22dc4698-cd92-419e-8901-a1b1fd15ce4f" (me) assigned 1 partition(s):
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]:   tremor_test_commit_offset [0]
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGNOR [thrd:main]: Group "group_commit_offset": "range" assignor run for 1 member(s)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent SyncGroupRequest (v3, 340 bytes @ 0, CorrId 7)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:GroupCoordinator]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received SyncGroupResponse (v3, 55 bytes, CorrId 7, rtt 0.43ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v3, 192 bytes @ 0, CorrId 8)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGN [thrd:main]: Group "group_commit_offset": new assignment of 1 partition(s) in join-state wait-assign-call
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v3, 6 bytes, CorrId 8, rtt 0.03ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CLEARASSIGN [thrd:main]: No current assignment to clear
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONF [thrd:main]: Topic "tremor_test_commit_offset" configuration (default_topic_conf):
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: BROADCAST [thrd:127.0.0.1:9092/bootstrap]: Broadcasting state change
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONF [thrd:main]:   auto.offset.reset = largest
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: ASSIGNMENT [thrd:main]: Added 1 partition(s) to assignment which now consists of 1 partition(s) where of 1 are in pending state and 0 are being queried
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: OFFSET [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent OffsetFetchRequest (v4, 130 bytes @ 0, CorrId 9)
[2022-08-17T12:34:59Z INFO  tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] Partitions Assigned: [Topic: tremor_test_commit_offset, Partition: 0]
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received OffsetFetchResponse (v4, 57 bytes, CorrId 9, rtt 0.04ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FETCH [thrd:main]: Partition tremor_test_commit_offset [0] start fetching at offset 1
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FetchRequest (v11, 164 bytes @ 0, CorrId 7)
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] Consumer connected.
[2022-08-17T12:34:59Z INFO  tremor_runtime::connectors::source] [Source::test::transactional_commit_offset_handling] Connected.
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received FetchResponse (v11, 227 bytes, CorrId 7, rtt 0.05ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FETCH [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Topic tremor_test_commit_offset [0] MessageSet size 140, error "Success", MaxOffset 3, LSO 3, Ver 4/4
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONSUME [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Enqueue 2 message(s) (4 bytes, 2 ops) on tremor_test_commit_offset [0] fetch queue (qlen 0, v4, last_offset 2, 0 ctrl msgs, uncompressed)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FetchRequest (v11, 164 bytes @ 0, CorrId 8)
[2022-08-17T12:34:59Z INFO  tremor_runtime::connectors] [Connector::test::transactional_commit_offset_handling] Connected.
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::source] [Source::test::transactional_commit_offset_handling] starting stream 0
[2022-08-17T12:34:59Z INFO  tremor_runtime::connectors::source] [Source::test::transactional_commit_offset_handling] Circuit Breaker: Restore.
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::tests] Received signal: Some(Start(SourceId(ConnectorId(1))))
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] ACK 0 2
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] Committing offset tremor_test_commit_offset 0: Offset(3)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: COMMIT [thrd:main]: GroupCoordinator/0: Committing offsets for 1 partition(s) with generation-id 9 in join-state steady: manual
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:GroupCoordinator]: GroupCoordinator/0: Sent OffsetCommitRequest (v7, 245 bytes @ 0, CorrId 10)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:GroupCoordinator]: GroupCoordinator/0: Received OffsetCommitResponse (v7, 45 bytes, CorrId 10, rtt 0.43ms)
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] Offsets committed successfully
[2022-08-17T12:34:59Z DEBUG tremor_runtime::connectors::impls::kafka::consumer] [Source::test::transactional_commit_offset_handling] Offsets: [Topic: tremor_test_commit_offset, Partition: 0, Offset: Offset(3)]
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received FetchResponse (v11, 157 bytes, CorrId 8, rtt 6.09ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FETCH [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Topic tremor_test_commit_offset [0] MessageSet size 70, error "Success", MaxOffset 4, LSO 4, Ver 4/4
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONSUME [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Enqueue 1 message(s) (2 bytes, 1 ops) on tremor_test_commit_offset [0] fetch queue (qlen 0, v4, last_offset 3, 0 ctrl msgs, uncompressed)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FetchRequest (v11, 164 bytes @ 0, CorrId 9)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received FetchResponse (v11, 297 bytes, CorrId 9, rtt 0.05ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FETCH [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Topic tremor_test_commit_offset [0] MessageSet size 210, error "Success", MaxOffset 4, LSO 4, Ver 5/5
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONSUME [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Enqueue 3 message(s) (6 bytes, 3 ops) on tremor_test_commit_offset [0] fetch queue (qlen 0, v5, last_offset 3, 0 ctrl msgs, uncompressed)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FetchRequest (v11, 164 bytes @ 0, CorrId 10)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: RECV [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Received FetchResponse (v11, 157 bytes, CorrId 10, rtt 6.07ms)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: FETCH [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Topic tremor_test_commit_offset [0] MessageSet size 70, error "Success", MaxOffset 5, LSO 5, Ver 5/5
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: CONSUME [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Enqueue 1 message(s) (2 bytes, 1 ops) on tremor_test_commit_offset [0] fetch queue (qlen 0, v5, last_offset 4, 0 ctrl msgs, uncompressed)
[2022-08-17T12:34:59Z DEBUG librdkafka] [Source::test::transactional_commit_offset_handling] librdkafka: SEND [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/0: Sent FetchRequest (v11, 164 bytes @ 0, CorrId 11)