streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
450 stars 137 forks source link

[BUG] When I enabled Kop in pulsar and setup a Kafka consumer (python) to consume from pulsar, it was failed with error. #1415

Open jiangtao7 opened 2 years ago

jiangtao7 commented 2 years ago

Describe the bug When I enabled Kop in pulsar and setup a Kafka consumer (python) to consume from pulsar, it was failed with error.

To Reproduce Pulsar & Kop: 2.9.2.12

or Run the this code: ` from confluent_kafka import Consumer

c = Consumer({'bootstrap.servers': 'pulsar-broker.pulsar-poc.svc.cluster.local:9092', 'group.id': 'my-kafka-group', 'auto.offset.reset': 'earliest'})

c.subscribe(['test-001'])

while True:
    msg = c.poll(1.0)

    if msg is None:
          continue
    if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()`

Error from the consumer:

2022-07-19T18:36:46.370588Z ERROR source{component_kind="source" component_id=kafka component_type=kafka component_name=kafka}: rdkafka::client: librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): pulsar-broker-2.pulsar-broker.pulsar-poc.svc.cluster.local:9092/317618533: Disconnected: verify that security.protocol is correctly configured, broker might require SASL authentication (after 1ms in state UP, 4 identical error(s) suppressed)

Expected behavior The consumer should work without error.

Additional context

  1. Error will happen when to setup a consumer for either of https://github.com/confluentinc/confluent-kafka-python or https://github.com/dpkp/kafka-python.
  2. No error will happen when to run ./bin/kafka-console-consumer.sh --bootstrap-server pulsar-broker.pulsar-poc.svc.cluster.local:9092 --topic test-001.
  3. No error will happen when to run go-kafka consumer.

This issue, so far, is only happened on consumers which are coded with Kafka python library.

BewareMyPower commented 2 years ago

What's your broker's config? I tried to reproduce this issue with a KoP standalone but it works well.

Here are my configs in conf/standalone.conf:

messagingProtocols=kafka
kafkaListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor,org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
# It should be noted that the latest Kafka clients requires transaction enabled
brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true

Then, since rdkafka disables topic auto creation by default, you must create the topic in advance:

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-001 --partitions 1 --replication-factor 1

Then I ran the following Python script:

from confluent_kafka import Consumer

c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'my-kafka-group', 'auto.offset.reset': 'earliest'})

c.subscribe(['test-001'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

It could receive the messages produced by the Kafka CLI tool as expected:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-001

If you cannot reproduce it in a KoP standalone as well, you should check your configs first. And you can enable the debug level logs in KoP and upload the log file.

sumeet-zuora commented 2 years ago

We are also facing similar issues below is output of broker logs

[conf/broker.conf] Applying config allowAutoTopicCreationType = partitioned
[conf/broker.conf] Applying config brokerDeduplicationEnabled = true
[conf/broker.conf] Applying config brokerDeleteInactiveTopicsEnabled = false
[conf/broker.conf] Applying config allowAutoTopicCreationType = partitioned
[conf/broker.conf] Applying config brokerDeduplicationEnabled = true
[conf/broker.conf] Applying config brokerDeleteInactiveTopicsEnabled = false
[conf/broker.conf] Applying config brokerServicePort = 6650
[conf/broker.conf] Applying config clusterName = pulsar
[conf/broker.conf] Applying config configurationStoreServers = pulsar-zookeeper:2181
[conf/broker.conf] Applying config exposeTopicLevelMetricsInPrometheus = true
[conf/broker.conf] Applying config functionsWorkerEnabled = true
[conf/broker.conf] Applying config managedLedgerDefaultAckQuorum = 2
[conf/broker.conf] Applying config managedLedgerDefaultEnsembleSize = 3
[conf/broker.conf] Applying config managedLedgerDefaultWriteQuorum = 3
[conf/broker.conf] Applying config numHttpServerThreads = 8
[conf/broker.conf] Applying config statusFilePath = /pulsar/status
[conf/broker.conf] Applying config webServicePort = 8080
[conf/broker.conf] Applying config zooKeeperSessionTimeoutMillis = 30000
[conf/broker.conf] Applying config zookeeperServers = pulsar-zookeeper:2181
[conf/broker.conf] Updating config allowAutoTopicCreationType = partitioned
[conf/broker.conf] Updating config brokerDeduplicationEnabled = true
[conf/broker.conf] Updating config brokerDeleteInactiveTopicsEnabled = false
[conf/broker.conf] Adding config brokerEntryMetadataInterceptors = org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor,org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
[conf/broker.conf] Adding config kafkaAdvertisedListeners = PLAINTEXT://pulsar-broker-2.pulsar-broker.pulsar-poc.svc.cluster.local:9092
[conf/broker.conf] Adding config kafkaListeners = PLAINTEXT://pulsar-broker-2.pulsar-broker.pulsar-poc.svc.cluster.local:9092
[conf/broker.conf] Adding config kafkaTransactionCoordinatorEnabled = true
[conf/broker.conf] Adding config messagingProtocols = kafka
OpenJDK 64-Bit Server VM warning: Option AggressiveOpts was deprecated in version 11.0 and will likely be removed in a future release.
Connecting to pulsar-zookeeper:2181
get [-s] [-w] path
14:21:14.752 [main] ERROR org.apache.zookeeper.util.ServiceUtils - Exiting JVM with code 1
OpenJDK 64-Bit Server VM warning: Option AggressiveOpts was deprecated in version 11.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by io.netty.util.internal.ReflectionUtil (file:/pulsar/lib/io.netty-netty-common-4.1.77.Final.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of io.netty.util.internal.ReflectionUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
14:22:22.180 [cluster-service-coordinator-timer] ERROR org.apache.pulsar.functions.worker.MembershipManager - Failed to get status of coordinate topic persistent://public/functions/coordinate
org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted. Failed reason: pulsar-broker-1.pulsar-broker.pulsar-poc.svc.cluster.local
        at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:251) ~[io.streamnative-pulsar-client-admin-original-2.10.1.3.jar:2.10.1.3]
        at org.apache.pulsar.client.admin.internal.TopicsImpl$12.failed(TopicsImpl.java:664) ~[io.streamnative-pulsar-client-admin-original-2.10.1.3.jar:2.10.1.3]
        at org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:882) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:247) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:242) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:62) ~[org.glassfish.jersey.core-jersey-client-2.34.jar:?]
        at org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:178) ~[org.glassfish.jersey.core-j

We are trying to use vector.dev aggregator agent which throws below error

2022-07-21T14:24:25.846608Z ERROR source{component_kind="source" component_id=kafka component_type=kafka component_name=kafka}: vector::internal_events::kafka: Failed to read message. error=Message consumption error: CoordinatorLoadInProgress (Broker: Coordinator load in progress) error_code="reading_message" error_type="reader_failed" stage="receiving"
BewareMyPower commented 2 years ago
14:22:22.180 [cluster-service-coordinator-timer] ERROR org.apache.pulsar.functions.worker.MembershipManager - Failed to get status of coordinate topic persistent://public/functions/coordinate
org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted. Failed reason: pulsar-broker-1.pulsar-broker.pulsar-poc.svc.cluster.local
        at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:251) ~[io.streamnative-pulsar-client-admin-original-2.10.1.3.jar:2.10.1.3]

There was something wrong when the functions worker started. You should check whether the broker has started successfully. If yes, you'd better change the log level to debug and see more details.

sumeet-zuora commented 2 years ago

Debug logs from broker

2022-07-21T15:48:49,187+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,187+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7353): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,190+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.150.51:47298] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87703), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87703), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6284]}, remoteAddress=/192.168.150.51:47298)
2022-07-21T15:48:49,190+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,190+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,194+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8710 And triggered send callback.
2022-07-21T15:48:49,194+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,194+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87703), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6284]}, remoteAddress=/192.168.150.51:47298) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670039,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,200+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314454), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314454), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=116858]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:49,200+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,200+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,206+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8711 And triggered send callback.
2022-07-21T15:48:49,206+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,206+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314454), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=116858]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670041,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,235+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126175), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126175), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7807]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:49,235+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,235+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,239+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8712 And triggered send callback.
2022-07-21T15:48:49,239+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,239+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126175), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7807]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670083,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,287+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7353), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670039,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7354), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7354), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670039,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7354) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670039, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670039 .
2022-07-21T15:48:49,287+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670039 in cache. cache size: 0
2022-07-21T15:48:49,287+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8712
2022-07-21T15:48:49,287+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8712, size: 7806, ConsumerManager original offset: 15670039, lastEntryPosition: 469:8712, nextOffset: 15670086
2022-07-21T15:48:49,287+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670086
2022-07-21T15:48:49,287+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,287+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7354): read 3 entries but only 3 entries are committed
2022-07-21T15:48:49,289+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7354), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670039,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670086,last_stable_offset=15670086,log_start_offset=15670086,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670039, timestamp=1658418529080, key=0 bytes, value=3138 bytes)), (record=DefaultRecord(offset=15670040, timestamp=1658418529080, key=0 bytes, value=3067 bytes)), (record=DefaultRecord(offset=15670041, timestamp=1658418529169, key=0 bytes, value=2760 bytes)), (record=DefaultRecord(offset=15670042, timestamp=1658418529169, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670043, timestamp=1658418529169, key=0 bytes, value=2760 bytes)), (record=DefaultRecord(offset=15670044, timestamp=1658418529169, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670045, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670046, timestamp=1658418529169, key=0 bytes, value=2832 bytes)), (record=DefaultRecord(offset=15670047, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670048, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670049, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670050, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670051, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670052, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670053, timestamp=1658418529169, key=0 bytes, value=2832 bytes)), (record=DefaultRecord(offset=15670054, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670055, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670056, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670057, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670058, timestamp=1658418529169, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670059, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670060, timestamp=1658418529169, key=0 bytes, value=2762 bytes)), (record=DefaultRecord(offset=15670061, timestamp=1658418529169, key=0 bytes, value=2733 bytes)), (record=DefaultRecord(offset=15670062, timestamp=1658418529169, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670063, timestamp=1658418529169, key=0 bytes, value=2776 bytes)), (record=DefaultRecord(offset=15670064, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670065, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670066, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670067, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670068, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670069, timestamp=1658418529169, key=0 bytes, value=2727 bytes)), (record=DefaultRecord(offset=15670070, timestamp=1658418529169, key=0 bytes, value=2731 bytes)), (record=DefaultRecord(offset=15670071, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670072, timestamp=1658418529169, key=0 bytes, value=2762 bytes)), (record=DefaultRecord(offset=15670073, timestamp=1658418529169, key=0 bytes, value=2733 bytes)), (record=DefaultRecord(offset=15670074, timestamp=1658418529169, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670075, timestamp=1658418529169, key=0 bytes, value=2776 bytes)), (record=DefaultRecord(offset=15670076, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670077, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670078, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670079, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670080, timestamp=1658418529169, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670081, timestamp=1658418529169, key=0 bytes, value=2728 bytes)), (record=DefaultRecord(offset=15670082, timestamp=1658418529169, key=0 bytes, value=2731 bytes)), (record=DefaultRecord(offset=15670083, timestamp=1658418528751, key=0 bytes, value=2345 bytes)), (record=DefaultRecord(offset=15670084, timestamp=1658418528751, key=0 bytes, value=3252 bytes)), (record=DefaultRecord(offset=15670085, timestamp=1658418528751, key=0 bytes, value=2120 bytes))]}]}]}
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7355), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7355), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670086,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7355) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670086, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670086 .
2022-07-21T15:48:49,289+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670086 in cache. cache size: 0
2022-07-21T15:48:49,290+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670086
2022-07-21T15:48:49,290+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,290+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7355): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,301+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.231.179:33140] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=69570), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=69570), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.231.179:33140)
2022-07-21T15:48:49,301+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,301+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,303+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126176), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126176), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=4139]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:49,303+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,303+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,305+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8713 And triggered send callback.
2022-07-21T15:48:49,305+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,305+0000 [send-response-OrderedScheduler-3-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=69570), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.231.179:33140) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670086,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,307+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8714 And triggered send callback.
2022-07-21T15:48:49,307+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,307+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126176), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=4139]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670087,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,308+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.124.120:47578] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155672), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155672), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7536]}, remoteAddress=/192.168.124.120:47578)
2022-07-21T15:48:49,308+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,308+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,312+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8715 And triggered send callback.
2022-07-21T15:48:49,312+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,312+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155672), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7536]}, remoteAddress=/192.168.124.120:47578) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670089,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,331+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314455), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314455), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=18045]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:49,331+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,331+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,335+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8716 And triggered send callback.
2022-07-21T15:48:49,335+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,335+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314455), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=18045]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670092,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,360+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.145.80:46926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100942), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100942), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.145.80:46926)
2022-07-21T15:48:49,360+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,360+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,364+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8717 And triggered send callback.
2022-07-21T15:48:49,364+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,364+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100942), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.145.80:46926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670099,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,390+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7355), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670086,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7356), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7356), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670086,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7356) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670086, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670086 .
2022-07-21T15:48:49,390+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670086 in cache. cache size: 0
2022-07-21T15:48:49,390+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8717
2022-07-21T15:48:49,390+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8717, size: 1829, ConsumerManager original offset: 15670086, lastEntryPosition: 469:8717, nextOffset: 15670100
2022-07-21T15:48:49,390+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670100
2022-07-21T15:48:49,390+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,390+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7356): read 5 entries but only 5 entries are committed
2022-07-21T15:48:49,391+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7356), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670086,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670100,last_stable_offset=15670100,log_start_offset=15670100,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670086, timestamp=1658418529232, key=0 bytes, value=1772 bytes)), (record=DefaultRecord(offset=15670087, timestamp=1658418522001, key=0 bytes, value=1950 bytes)), (record=DefaultRecord(offset=15670088, timestamp=1658418522001, key=0 bytes, value=2109 bytes)), (record=DefaultRecord(offset=15670089, timestamp=1658418529303, key=0 bytes, value=2435 bytes)), (record=DefaultRecord(offset=15670090, timestamp=1658418529303, key=0 bytes, value=2436 bytes)), (record=DefaultRecord(offset=15670091, timestamp=1658418529303, key=0 bytes, value=2577 bytes)), (record=DefaultRecord(offset=15670092, timestamp=1658418529286, key=0 bytes, value=2556 bytes)), (record=DefaultRecord(offset=15670093, timestamp=1658418529286, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670094, timestamp=1658418529286, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670095, timestamp=1658418529286, key=0 bytes, value=2584 bytes)), (record=DefaultRecord(offset=15670096, timestamp=1658418529286, key=0 bytes, value=2583 bytes)), (record=DefaultRecord(offset=15670097, timestamp=1658418529286, key=0 bytes, value=2655 bytes)), (record=DefaultRecord(offset=15670098, timestamp=1658418529286, key=0 bytes, value=2435 bytes)), (record=DefaultRecord(offset=15670099, timestamp=1658418529296, key=0 bytes, value=1772 bytes))]}]}]}
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7357), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7357), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670100,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7357) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670100, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670100 .
2022-07-21T15:48:49,391+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670100 in cache. cache size: 0
2022-07-21T15:48:49,391+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670100
2022-07-21T15:48:49,391+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,391+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7357): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,440+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.124.120:47578] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155673), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155673), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2696]}, remoteAddress=/192.168.124.120:47578)
2022-07-21T15:48:49,440+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,440+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,444+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8718 And triggered send callback.
2022-07-21T15:48:49,444+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,444+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155673), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2696]}, remoteAddress=/192.168.124.120:47578) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670100,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,491+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7357), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670100,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7358), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7358), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670100,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7358) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670100, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670100 .
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670100 in cache. cache size: 0
2022-07-21T15:48:49,492+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8718
2022-07-21T15:48:49,492+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8718, size: 2683, ConsumerManager original offset: 15670100, lastEntryPosition: 469:8718, nextOffset: 15670101
2022-07-21T15:48:49,492+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670101
2022-07-21T15:48:49,492+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,492+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7358): read 1 entries but only 1 entries are committed
2022-07-21T15:48:49,492+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7358), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670100,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670101,last_stable_offset=15670101,log_start_offset=15670101,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670100, timestamp=1658418529429, key=0 bytes, value=2626 bytes))]}]}]}
2022-07-21T15:48:49,492+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7359), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7359), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670101,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7359) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670101, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670101 .
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670101 in cache. cache size: 0
2022-07-21T15:48:49,493+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670101
2022-07-21T15:48:49,493+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,493+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7359): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.105.134:44070] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166303), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166303), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1838]}, remoteAddress=/192.168.105.134:44070)
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,493+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,497+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8719 And triggered send callback.
2022-07-21T15:48:49,497+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,497+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166303), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1838]}, remoteAddress=/192.168.105.134:44070) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670101,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,593+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7359), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670101,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7360), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7360), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670101,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7360) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670101, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670101 .
2022-07-21T15:48:49,593+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670101 in cache. cache size: 0
2022-07-21T15:48:49,593+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8719
2022-07-21T15:48:49,593+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8719, size: 1825, ConsumerManager original offset: 15670101, lastEntryPosition: 469:8719, nextOffset: 15670102
2022-07-21T15:48:49,593+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670102
2022-07-21T15:48:49,593+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,593+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7360): read 1 entries but only 1 entries are committed
2022-07-21T15:48:49,594+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7360), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670101,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670102,last_stable_offset=15670102,log_start_offset=15670102,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670101, timestamp=1658418529285, key=0 bytes, value=1768 bytes))]}]}]}
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7361), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7361), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670102,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7361) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670102, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670102 .
2022-07-21T15:48:49,594+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670102 in cache. cache size: 0
2022-07-21T15:48:49,594+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670102
2022-07-21T15:48:49,594+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,594+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7361): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,685+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.59:53034] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114742), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114742), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2420]}, remoteAddress=/192.168.120.59:53034)
2022-07-21T15:48:49,685+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,685+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,690+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8720 And triggered send callback.
2022-07-21T15:48:49,690+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,690+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114742), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2420]}, remoteAddress=/192.168.120.59:53034) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670102,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,694+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7361), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670102,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7362), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7362), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670102,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7362) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670102, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670102 .
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670102 in cache. cache size: 0
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8720
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8720, size: 2407, ConsumerManager original offset: 15670102, lastEntryPosition: 469:8720, nextOffset: 15670103
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670103
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7362): read 1 entries but only 1 entries are committed
2022-07-21T15:48:49,695+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7362), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670102,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670103,last_stable_offset=15670103,log_start_offset=15670103,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670102, timestamp=1658418529637, key=0 bytes, value=2350 bytes))]}]}]}
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7363), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7363), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670103,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7363) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670103, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670103 .
2022-07-21T15:48:49,695+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670103 in cache. cache size: 0
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670103
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,695+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7363): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,710+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.150.51:47298] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87704), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87704), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=4210]}, remoteAddress=/192.168.150.51:47298)
2022-07-21T15:48:49,710+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,710+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,713+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8721 And triggered send callback.
2022-07-21T15:48:49,713+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,713+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87704), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=4210]}, remoteAddress=/192.168.150.51:47298) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670103,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,795+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7363), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670103,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7364), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7364), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670103,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7364) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670103, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670103 .
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670103 in cache. cache size: 0
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8721
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8721, size: 4197, ConsumerManager original offset: 15670103, lastEntryPosition: 469:8721, nextOffset: 15670104
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670104
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7364): read 1 entries but only 1 entries are committed
2022-07-21T15:48:49,796+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7364), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670103,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670104,last_stable_offset=15670104,log_start_offset=15670104,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670103, timestamp=1658418529500, key=0 bytes, value=4140 bytes))]}]}]}
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7365), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7365), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670104,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7365) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670104, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670104 .
2022-07-21T15:48:49,796+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670104 in cache. cache size: 0
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670104
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,796+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7365): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126177), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126177), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1994]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.77.199:57620] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152569), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152569), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7476]}, remoteAddress=/192.168.77.199:57620)
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,823+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,826+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8722 And triggered send callback.
2022-07-21T15:48:49,826+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,826+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8723 And triggered send callback.
2022-07-21T15:48:49,826+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,827+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126177), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1994]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670104,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,827+0000 [send-response-OrderedScheduler-3-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152569), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=7476]}, remoteAddress=/192.168.77.199:57620) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670105,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,851+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314456), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314456), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=26592]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:49,851+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,851+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,855+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8724 And triggered send callback.
2022-07-21T15:48:49,855+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,855+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314456), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=26592]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670108,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,881+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.145.80:46926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100943), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100943), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=54092]}, remoteAddress=/192.168.145.80:46926)
2022-07-21T15:48:49,881+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,881+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,885+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8725 And triggered send callback.
2022-07-21T15:48:49,885+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,885+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100943), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=54092]}, remoteAddress=/192.168.145.80:46926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670118,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,890+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126178), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126178), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:49,890+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,890+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,894+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8726 And triggered send callback.
2022-07-21T15:48:49,894+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,894+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126178), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670139,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,897+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7365), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670104,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7366), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7366), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670104,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7366) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670104, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670104 .
2022-07-21T15:48:49,897+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670104 in cache. cache size: 0
2022-07-21T15:48:49,897+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8726
2022-07-21T15:48:49,897+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8726, size: 1829, ConsumerManager original offset: 15670104, lastEntryPosition: 469:8726, nextOffset: 15670140
2022-07-21T15:48:49,897+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670140
2022-07-21T15:48:49,897+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,897+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7366): read 5 entries but only 5 entries are committed
2022-07-21T15:48:49,898+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7366), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670104,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670140,last_stable_offset=15670140,log_start_offset=15670140,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670104, timestamp=1658418529752, key=0 bytes, value=1924 bytes)), (record=DefaultRecord(offset=15670105, timestamp=1658418529017, key=0 bytes, value=1772 bytes)), (record=DefaultRecord(offset=15670106, timestamp=1658418529017, key=0 bytes, value=2810 bytes)), (record=DefaultRecord(offset=15670107, timestamp=1658418529017, key=0 bytes, value=2804 bytes)), (record=DefaultRecord(offset=15670108, timestamp=1658418529837, key=0 bytes, value=2708 bytes)), (record=DefaultRecord(offset=15670109, timestamp=1658418529837, key=0 bytes, value=2709 bytes)), (record=DefaultRecord(offset=15670110, timestamp=1658418529837, key=0 bytes, value=2708 bytes)), (record=DefaultRecord(offset=15670111, timestamp=1658418529837, key=0 bytes, value=2556 bytes)), (record=DefaultRecord(offset=15670112, timestamp=1658418529837, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670113, timestamp=1658418529837, key=0 bytes, value=2598 bytes)), (record=DefaultRecord(offset=15670114, timestamp=1658418529837, key=0 bytes, value=2638 bytes)), (record=DefaultRecord(offset=15670115, timestamp=1658418529837, key=0 bytes, value=2668 bytes)), (record=DefaultRecord(offset=15670116, timestamp=1658418529837, key=0 bytes, value=2867 bytes)), (record=DefaultRecord(offset=15670117, timestamp=1658418529837, key=0 bytes, value=2435 bytes)), (record=DefaultRecord(offset=15670118, timestamp=1658418529750, key=0 bytes, value=2557 bytes)), (record=DefaultRecord(offset=15670119, timestamp=1658418529750, key=0 bytes, value=2558 bytes)), (record=DefaultRecord(offset=15670120, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670121, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670122, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670123, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670124, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670125, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670126, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670127, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670128, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670129, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670130, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670131, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670132, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670133, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670134, timestamp=1658418529750, key=0 bytes, value=2558 bytes)), (record=DefaultRecord(offset=15670135, timestamp=1658418529750, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670136, timestamp=1658418529750, key=0 bytes, value=2418 bytes)), (record=DefaultRecord(offset=15670137, timestamp=1658418529750, key=0 bytes, value=2726 bytes)), (record=DefaultRecord(offset=15670138, timestamp=1658418529750, key=0 bytes, value=2713 bytes)), (record=DefaultRecord(offset=15670139, timestamp=1658418529865, key=0 bytes, value=1772 bytes))]}]}]}
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7367), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7367), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670140,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7367) Fetch request. Size: 1. Each item:
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670140, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670140 .
2022-07-21T15:48:49,899+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670140 in cache. cache size: 0
2022-07-21T15:48:49,899+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670140
2022-07-21T15:48:49,899+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,899+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7367): read 0 entries but only 0 entries are committed
2022-07-21T15:48:49,917+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314457), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314457), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=26557]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:49,917+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,917+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,921+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8727 And triggered send callback.
2022-07-21T15:48:49,921+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,921+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314457), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=26557]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670140,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,923+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314458), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314458), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5354]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:49,923+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,923+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,925+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126179), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126179), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=10844]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:49,925+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,925+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,927+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8728 And triggered send callback.
2022-07-21T15:48:49,927+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,927+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314458), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5354]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670150,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,929+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8729 And triggered send callback.
2022-07-21T15:48:49,929+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,929+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126179), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=10844]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670152,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,948+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.59:53034] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114743), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114743), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=76806]}, remoteAddress=/192.168.120.59:53034)
2022-07-21T15:48:49,948+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,948+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,953+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8730 And triggered send callback.
2022-07-21T15:48:49,953+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,953+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=114743), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=76806]}, remoteAddress=/192.168.120.59:53034) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670156,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,955+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.77.199:57620] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152570), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152570), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5693]}, remoteAddress=/192.168.77.199:57620)
2022-07-21T15:48:49,955+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,955+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,959+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8731 And triggered send callback.
2022-07-21T15:48:49,959+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,959+0000 [send-response-OrderedScheduler-3-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152570), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5693]}, remoteAddress=/192.168.77.199:57620) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670196,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,959+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.124.120:47578] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155674), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155674), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=24888]}, remoteAddress=/192.168.124.120:47578)
2022-07-21T15:48:49,959+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,959+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,963+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8732 And triggered send callback.
2022-07-21T15:48:49,963+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,963+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155674), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=24888]}, remoteAddress=/192.168.124.120:47578) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670198,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,972+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.150.51:47298] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87705), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87705), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.150.51:47298)
2022-07-21T15:48:49,972+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,972+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:49,975+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8733 And triggered send callback.
2022-07-21T15:48:49,975+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:49,975+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87705), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=1842]}, remoteAddress=/192.168.150.51:47298) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670207,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:49,999+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7367), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670140,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:49,999+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7368), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7368), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670140,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7368) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670140, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670140 .
2022-07-21T15:48:50,000+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670140 in cache. cache size: 0
2022-07-21T15:48:50,000+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8731
2022-07-21T15:48:50,000+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8731, size: 5687, ConsumerManager original offset: 15670140, lastEntryPosition: 469:8731, nextOffset: 15670198
2022-07-21T15:48:50,000+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670198
2022-07-21T15:48:50,000+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,000+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7368): read 5 entries but only 5 entries are committed
2022-07-21T15:48:50,001+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7368), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670140,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670208,last_stable_offset=15670208,log_start_offset=15670208,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670140, timestamp=1658418529896, key=0 bytes, value=2556 bytes)), (record=DefaultRecord(offset=15670141, timestamp=1658418529896, key=0 bytes, value=2554 bytes)), (record=DefaultRecord(offset=15670142, timestamp=1658418529896, key=0 bytes, value=2598 bytes)), (record=DefaultRecord(offset=15670143, timestamp=1658418529896, key=0 bytes, value=2638 bytes)), (record=DefaultRecord(offset=15670144, timestamp=1658418529896, key=0 bytes, value=2667 bytes)), (record=DefaultRecord(offset=15670145, timestamp=1658418529896, key=0 bytes, value=2867 bytes)), (record=DefaultRecord(offset=15670146, timestamp=1658418529896, key=0 bytes, value=2435 bytes)), (record=DefaultRecord(offset=15670147, timestamp=1658418529896, key=0 bytes, value=2697 bytes)), (record=DefaultRecord(offset=15670148, timestamp=1658418529896, key=0 bytes, value=2697 bytes)), (record=DefaultRecord(offset=15670149, timestamp=1658418529896, key=0 bytes, value=2697 bytes)), (record=DefaultRecord(offset=15670150, timestamp=1658418529918, key=0 bytes, value=2604 bytes)), (record=DefaultRecord(offset=15670151, timestamp=1658418529918, key=0 bytes, value=2671 bytes)), (record=DefaultRecord(offset=15670152, timestamp=1658418529904, key=0 bytes, value=3255 bytes)), (record=DefaultRecord(offset=15670153, timestamp=1658418529904, key=0 bytes, value=2120 bytes)), (record=DefaultRecord(offset=15670154, timestamp=1658418529904, key=0 bytes, value=3252 bytes)), (record=DefaultRecord(offset=15670155, timestamp=1658418529904, key=0 bytes, value=2120 bytes)), (record=DefaultRecord(offset=15670156, timestamp=1658418529814, key=0 bytes, value=1905 bytes)), (record=DefaultRecord(offset=15670157, timestamp=1658418529814, key=0 bytes, value=1861 bytes)), (record=DefaultRecord(offset=15670158, timestamp=1658418529814, key=0 bytes, value=1902 bytes)), (record=DefaultRecord(offset=15670159, timestamp=1658418529814, key=0 bytes, value=1902 bytes)), (record=DefaultRecord(offset=15670160, timestamp=1658418529814, key=0 bytes, value=1908 bytes)), (record=DefaultRecord(offset=15670161, timestamp=1658418529814, key=0 bytes, value=1908 bytes)), (record=DefaultRecord(offset=15670162, timestamp=1658418529814, key=0 bytes, value=1913 bytes)), (record=DefaultRecord(offset=15670163, timestamp=1658418529814, key=0 bytes, value=1896 bytes)), (record=DefaultRecord(offset=15670164, timestamp=1658418529814, key=0 bytes, value=1896 bytes)), (record=DefaultRecord(offset=15670165, timestamp=1658418529814, key=0 bytes, value=1896 bytes)), (record=DefaultRecord(offset=15670166, timestamp=1658418529814, key=0 bytes, value=1895 bytes)), (record=DefaultRecord(offset=15670167, timestamp=1658418529814, key=0 bytes, value=1904 bytes)), (record=DefaultRecord(offset=15670168, timestamp=1658418529814, key=0 bytes, value=1913 bytes)), (record=DefaultRecord(offset=15670169, timestamp=1658418529814, key=0 bytes, value=1911 bytes)), (record=DefaultRecord(offset=15670170, timestamp=1658418529814, key=0 bytes, value=1917 bytes)), (record=DefaultRecord(offset=15670171, timestamp=1658418529814, key=0 bytes, value=1915 bytes)), (record=DefaultRecord(offset=15670172, timestamp=1658418529814, key=0 bytes, value=1917 bytes)), (record=DefaultRecord(offset=15670173, timestamp=1658418529814, key=0 bytes, value=1928 bytes)), (record=DefaultRecord(offset=15670174, timestamp=1658418529814, key=0 bytes, value=1913 bytes)), (record=DefaultRecord(offset=15670175, timestamp=1658418529814, key=0 bytes, value=1927 bytes)), (record=DefaultRecord(offset=15670176, timestamp=1658418529814, key=0 bytes, value=1928 bytes)), (record=DefaultRecord(offset=15670177, timestamp=1658418529814, key=0 bytes, value=1929 bytes)), (record=DefaultRecord(offset=15670178, timestamp=1658418529814, key=0 bytes, value=1913 bytes)), (record=DefaultRecord(offset=15670179, timestamp=1658418529814, key=0 bytes, value=1917 bytes)), (record=DefaultRecord(offset=15670180, timestamp=1658418529814, key=0 bytes, value=1923 bytes)), (record=DefaultRecord(offset=15670181, timestamp=1658418529814, key=0 bytes, value=1917 bytes)), (record=DefaultRecord(offset=15670182, timestamp=1658418529814, key=0 bytes, value=1918 bytes)), (record=DefaultRecord(offset=15670183, timestamp=1658418529814, key=0 bytes, value=1881 bytes)), (record=DefaultRecord(offset=15670184, timestamp=1658418529814, key=0 bytes, value=1885 bytes)), (record=DefaultRecord(offset=15670185, timestamp=1658418529814, key=0 bytes, value=1909 bytes)), (record=DefaultRecord(offset=15670186, timestamp=1658418529814, key=0 bytes, value=1915 bytes)), (record=DefaultRecord(offset=15670187, timestamp=1658418529814, key=0 bytes, value=1917 bytes)), (record=DefaultRecord(offset=15670188, timestamp=1658418529814, key=0 bytes, value=1926 bytes)), (record=DefaultRecord(offset=15670189, timestamp=1658418529814, key=0 bytes, value=1909 bytes)), (record=DefaultRecord(offset=15670190, timestamp=1658418529814, key=0 bytes, value=1915 bytes)), (record=DefaultRecord(offset=15670191, timestamp=1658418529814, key=0 bytes, value=1921 bytes)), (record=DefaultRecord(offset=15670192, timestamp=1658418529814, key=0 bytes, value=1909 bytes)), (record=DefaultRecord(offset=15670193, timestamp=1658418529814, key=0 bytes, value=1921 bytes)), (record=DefaultRecord(offset=15670194, timestamp=1658418529814, key=0 bytes, value=1925 bytes)), (record=DefaultRecord(offset=15670195, timestamp=1658418529814, key=0 bytes, value=1880 bytes)), (record=DefaultRecord(offset=15670196, timestamp=1658418529916, key=0 bytes, value=2810 bytes)), (record=DefaultRecord(offset=15670197, timestamp=1658418529916, key=0 bytes, value=2804 bytes))]}]}]}
2022-07-21T15:48:50,001+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:50,001+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7369), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7369), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670198,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7369) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670198, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670198 .
2022-07-21T15:48:50,002+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670198 in cache. cache size: 0
2022-07-21T15:48:50,002+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8733
2022-07-21T15:48:50,002+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8733, size: 1829, ConsumerManager original offset: 15670198, lastEntryPosition: 469:8733, nextOffset: 15670208
2022-07-21T15:48:50,002+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670208
2022-07-21T15:48:50,002+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,002+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7369): read 2 entries but only 2 entries are committed
2022-07-21T15:48:50,002+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7369), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670198,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670208,last_stable_offset=15670208,log_start_offset=15670208,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670198, timestamp=1658418529831, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670199, timestamp=1658418529831, key=0 bytes, value=2577 bytes)), (record=DefaultRecord(offset=15670200, timestamp=1658418529831, key=0 bytes, value=2577 bytes)), (record=DefaultRecord(offset=15670201, timestamp=1658418529831, key=0 bytes, value=2816 bytes)), (record=DefaultRecord(offset=15670202, timestamp=1658418529831, key=0 bytes, value=2799 bytes)), (record=DefaultRecord(offset=15670203, timestamp=1658418529831, key=0 bytes, value=2816 bytes)), (record=DefaultRecord(offset=15670204, timestamp=1658418529831, key=0 bytes, value=2799 bytes)), (record=DefaultRecord(offset=15670205, timestamp=1658418529831, key=0 bytes, value=2816 bytes)), (record=DefaultRecord(offset=15670206, timestamp=1658418529831, key=0 bytes, value=2799 bytes)), (record=DefaultRecord(offset=15670207, timestamp=1658418529919, key=0 bytes, value=1772 bytes))]}]}]}
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7370), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7370), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670208,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7370) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670208, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670208 .
2022-07-21T15:48:50,003+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670208 in cache. cache size: 0
2022-07-21T15:48:50,003+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670208
2022-07-21T15:48:50,003+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,003+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7370): read 0 entries but only 0 entries are committed
2022-07-21T15:48:50,012+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.105.134:44070] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166304), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166304), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=3703]}, remoteAddress=/192.168.105.134:44070)
2022-07-21T15:48:50,012+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,012+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,015+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8734 And triggered send callback.
2022-07-21T15:48:50,015+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,016+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166304), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=3703]}, remoteAddress=/192.168.105.134:44070) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670208,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,058+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.22.90:53926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126180), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126180), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=46269]}, remoteAddress=/192.168.22.90:53926)
2022-07-21T15:48:50,058+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,058+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,063+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8735 And triggered send callback.
2022-07-21T15:48:50,063+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,063+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=126180), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=46269]}, remoteAddress=/192.168.22.90:53926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670209,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,087+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.77.199:57620] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152571), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152571), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=8040]}, remoteAddress=/192.168.77.199:57620)
2022-07-21T15:48:50,087+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,087+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,091+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.200.226:43228] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=135832), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=135832), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6818]}, remoteAddress=/192.168.200.226:43228)
2022-07-21T15:48:50,091+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,091+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,091+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8736 And triggered send callback.
2022-07-21T15:48:50,091+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,091+0000 [send-response-OrderedScheduler-3-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152571), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=8040]}, remoteAddress=/192.168.77.199:57620) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670231,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,095+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8737 And triggered send callback.
2022-07-21T15:48:50,095+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,095+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=135832), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6818]}, remoteAddress=/192.168.200.226:43228) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670234,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,103+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7370), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670208,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7371), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7371), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670208,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7371) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670208, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670208 .
2022-07-21T15:48:50,103+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670208 in cache. cache size: 0
2022-07-21T15:48:50,103+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8737
2022-07-21T15:48:50,103+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8737, size: 6819, ConsumerManager original offset: 15670208, lastEntryPosition: 469:8737, nextOffset: 15670237
2022-07-21T15:48:50,103+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670237
2022-07-21T15:48:50,103+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,103+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7371): read 4 entries but only 4 entries are committed
2022-07-21T15:48:50,104+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7371), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670208,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670237,last_stable_offset=15670237,log_start_offset=15670237,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670208, timestamp=1658418529773, key=0 bytes, value=3633 bytes)), (record=DefaultRecord(offset=15670209, timestamp=1658418529994, key=0 bytes, value=2030 bytes)), (record=DefaultRecord(offset=15670210, timestamp=1658418529994, key=0 bytes, value=2042 bytes)), (record=DefaultRecord(offset=15670211, timestamp=1658418529994, key=0 bytes, value=2048 bytes)), (record=DefaultRecord(offset=15670212, timestamp=1658418529994, key=0 bytes, value=2067 bytes)), (record=DefaultRecord(offset=15670213, timestamp=1658418529994, key=0 bytes, value=2065 bytes)), (record=DefaultRecord(offset=15670214, timestamp=1658418529994, key=0 bytes, value=2035 bytes)), (record=DefaultRecord(offset=15670215, timestamp=1658418529994, key=0 bytes, value=2033 bytes)), (record=DefaultRecord(offset=15670216, timestamp=1658418529994, key=0 bytes, value=2038 bytes)), (record=DefaultRecord(offset=15670217, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670218, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670219, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670220, timestamp=1658418529994, key=0 bytes, value=2114 bytes)), (record=DefaultRecord(offset=15670221, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670222, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670223, timestamp=1658418529994, key=0 bytes, value=2114 bytes)), (record=DefaultRecord(offset=15670224, timestamp=1658418529994, key=0 bytes, value=2114 bytes)), (record=DefaultRecord(offset=15670225, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670226, timestamp=1658418529994, key=0 bytes, value=2115 bytes)), (record=DefaultRecord(offset=15670227, timestamp=1658418529994, key=0 bytes, value=2074 bytes)), (record=DefaultRecord(offset=15670228, timestamp=1658418529994, key=0 bytes, value=2367 bytes)), (record=DefaultRecord(offset=15670229, timestamp=1658418529994, key=0 bytes, value=2031 bytes)), (record=DefaultRecord(offset=15670230, timestamp=1658418529994, key=0 bytes, value=2033 bytes)), (record=DefaultRecord(offset=15670231, timestamp=1658418530046, key=0 bytes, value=2338 bytes)), (record=DefaultRecord(offset=15670232, timestamp=1658418530046, key=0 bytes, value=2810 bytes)), (record=DefaultRecord(offset=15670233, timestamp=1658418530046, key=0 bytes, value=2804 bytes)), (record=DefaultRecord(offset=15670234, timestamp=1658418530041, key=0 bytes, value=2183 bytes)), (record=DefaultRecord(offset=15670235, timestamp=1658418530041, key=0 bytes, value=2319 bytes)), (record=DefaultRecord(offset=15670236, timestamp=1658418530041, key=0 bytes, value=2228 bytes))]}]}]}
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7372), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7372), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670237,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7372) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670237, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670237 .
2022-07-21T15:48:50,105+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670237 in cache. cache size: 0
2022-07-21T15:48:50,105+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670237
2022-07-21T15:48:50,105+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,105+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7372): read 0 entries but only 0 entries are committed
2022-07-21T15:48:50,142+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.145.80:46926] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100944), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100944), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6179]}, remoteAddress=/192.168.145.80:46926)
2022-07-21T15:48:50,142+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,142+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,145+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8738 And triggered send callback.
2022-07-21T15:48:50,146+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,146+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=100944), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=6179]}, remoteAddress=/192.168.145.80:46926) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670237,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,154+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.77.199:57620] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152572), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152572), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5693]}, remoteAddress=/192.168.77.199:57620)
2022-07-21T15:48:50,154+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,154+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,158+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8739 And triggered send callback.
2022-07-21T15:48:50,158+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,158+0000 [send-response-OrderedScheduler-3-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=152572), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5693]}, remoteAddress=/192.168.77.199:57620) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670239,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,185+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314459), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314459), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=19384]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:50,185+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,185+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,189+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8740 And triggered send callback.
2022-07-21T15:48:50,189+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,189+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314459), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=19384]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670241,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,198+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314460), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314460), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=177039]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:50,198+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,198+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,204+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8741 And triggered send callback.
2022-07-21T15:48:50,204+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,204+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314460), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=177039]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670248,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,205+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7372), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670237,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7373), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7373), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670237,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7373) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670237, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670237 .
2022-07-21T15:48:50,205+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670237 in cache. cache size: 0
2022-07-21T15:48:50,205+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8741
2022-07-21T15:48:50,205+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8741, size: 177467, ConsumerManager original offset: 15670237, lastEntryPosition: 469:8741, nextOffset: 15670312
2022-07-21T15:48:50,205+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670312
2022-07-21T15:48:50,205+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,205+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7373): read 4 entries but only 4 entries are committed
2022-07-21T15:48:50,208+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7373), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670237,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670312,last_stable_offset=15670312,log_start_offset=15670312,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670237, timestamp=1658418528648, key=0 bytes, value=3029 bytes)), (record=DefaultRecord(offset=15670238, timestamp=1658418528648, key=0 bytes, value=3070 bytes)), (record=DefaultRecord(offset=15670239, timestamp=1658418530119, key=0 bytes, value=2810 bytes)), (record=DefaultRecord(offset=15670240, timestamp=1658418530119, key=0 bytes, value=2804 bytes)), (record=DefaultRecord(offset=15670241, timestamp=1658418530172, key=0 bytes, value=2760 bytes)), (record=DefaultRecord(offset=15670242, timestamp=1658418530172, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670243, timestamp=1658418530172, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670244, timestamp=1658418530172, key=0 bytes, value=2760 bytes)), (record=DefaultRecord(offset=15670245, timestamp=1658418530172, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670246, timestamp=1658418530172, key=0 bytes, value=2760 bytes)), (record=DefaultRecord(offset=15670247, timestamp=1658418530172, key=0 bytes, value=2745 bytes)), (record=DefaultRecord(offset=15670248, timestamp=1658418530184, key=0 bytes, value=2832 bytes)), (record=DefaultRecord(offset=15670249, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670250, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670251, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670252, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670253, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670254, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670255, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670256, timestamp=1658418530184, key=0 bytes, value=2832 bytes)), (record=DefaultRecord(offset=15670257, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670258, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670259, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670260, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670261, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670262, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670263, timestamp=1658418530184, key=0 bytes, value=2832 bytes)), (record=DefaultRecord(offset=15670264, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670265, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670266, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670267, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670268, timestamp=1658418530184, key=0 bytes, value=2797 bytes)), (record=DefaultRecord(offset=15670269, timestamp=1658418530184, key=0 bytes, value=2762 bytes)), (record=DefaultRecord(offset=15670270, timestamp=1658418530184, key=0 bytes, value=2734 bytes)), (record=DefaultRecord(offset=15670271, timestamp=1658418530184, key=0 bytes, value=2776 bytes)), (record=DefaultRecord(offset=15670272, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670273, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670274, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670275, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670276, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670277, timestamp=1658418530184, key=0 bytes, value=2751 bytes)), (record=DefaultRecord(offset=15670278, timestamp=1658418530184, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670279, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670280, timestamp=1658418530184, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670281, timestamp=1658418530184, key=0 bytes, value=2598 bytes)), (record=DefaultRecord(offset=15670282, timestamp=1658418530184, key=0 bytes, value=2638 bytes)), (record=DefaultRecord(offset=15670283, timestamp=1658418530184, key=0 bytes, value=2718 bytes)), (record=DefaultRecord(offset=15670284, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670285, timestamp=1658418530184, key=0 bytes, value=2762 bytes)), (record=DefaultRecord(offset=15670286, timestamp=1658418530184, key=0 bytes, value=2733 bytes)), (record=DefaultRecord(offset=15670287, timestamp=1658418530184, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670288, timestamp=1658418530184, key=0 bytes, value=2776 bytes)), (record=DefaultRecord(offset=15670289, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670290, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670291, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670292, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670293, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670294, timestamp=1658418530184, key=0 bytes, value=2728 bytes)), (record=DefaultRecord(offset=15670295, timestamp=1658418530184, key=0 bytes, value=2731 bytes)), (record=DefaultRecord(offset=15670296, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670297, timestamp=1658418530184, key=0 bytes, value=2762 bytes)), (record=DefaultRecord(offset=15670298, timestamp=1658418530184, key=0 bytes, value=2733 bytes)), (record=DefaultRecord(offset=15670299, timestamp=1658418530184, key=0 bytes, value=2747 bytes)), (record=DefaultRecord(offset=15670300, timestamp=1658418530184, key=0 bytes, value=2776 bytes)), (record=DefaultRecord(offset=15670301, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670302, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670303, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670304, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670305, timestamp=1658418530184, key=0 bytes, value=2769 bytes)), (record=DefaultRecord(offset=15670306, timestamp=1658418530184, key=0 bytes, value=2728 bytes)), (record=DefaultRecord(offset=15670307, timestamp=1658418530184, key=0 bytes, value=2731 bytes)), (record=DefaultRecord(offset=15670308, timestamp=1658418530184, key=0 bytes, value=2748 bytes)), (record=DefaultRecord(offset=15670309, timestamp=1658418530184, key=0 bytes, value=2556 bytes)), (record=DefaultRecord(offset=15670310, timestamp=1658418530184, key=0 bytes, value=2435 bytes)), (record=DefaultRecord(offset=15670311, timestamp=1658418530184, key=0 bytes, value=2554 bytes))]}]}]}
2022-07-21T15:48:50,208+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:50,208+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: false
2022-07-21T15:48:50,208+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:50,208+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Channel writability has changed to: true
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7374), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7374), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670312,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7374) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670312, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670312 .
2022-07-21T15:48:50,209+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670312 in cache. cache size: 0
2022-07-21T15:48:50,209+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670312
2022-07-21T15:48:50,209+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,209+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7374): read 0 entries but only 0 entries are committed
2022-07-21T15:48:50,220+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.111.246:53848] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314461), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314461), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5614]}, remoteAddress=/192.168.111.246:53848)
2022-07-21T15:48:50,220+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,220+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,221+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.124.120:47578] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155675), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155675), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2646]}, remoteAddress=/192.168.124.120:47578)
2022-07-21T15:48:50,221+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,221+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,223+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8742 And triggered send callback.
2022-07-21T15:48:50,223+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,223+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=314461), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=5614]}, remoteAddress=/192.168.111.246:53848) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670312,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,225+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8743 And triggered send callback.
2022-07-21T15:48:50,225+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,225+0000 [send-response-OrderedScheduler-2-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=155675), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2646]}, remoteAddress=/192.168.124.120:47578) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670314,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,234+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.150.51:47298] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87706), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87706), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=12678]}, remoteAddress=/192.168.150.51:47298)
2022-07-21T15:48:50,234+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,234+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,238+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8744 And triggered send callback.
2022-07-21T15:48:50,238+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,238+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=87706), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=12678]}, remoteAddress=/192.168.150.51:47298) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670315,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,274+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.105.134:44070] Received kafka cmd RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166305), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166305), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2891]}, remoteAddress=/192.168.105.134:44070)
2022-07-21T15:48:50,274+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,274+0000 [pulsar-ph-kafka-64-1] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,278+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessagePublishContext - Success write topic: persistent://public/default/test-02-partition-0, producerName KOP-PID-PREFIX--1--1 ledgerId: 469, entryId: 8745 And triggered send callback.
2022-07-21T15:48:50,278+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.storage.ReplicaManager - Complete handle appendRecords.
2022-07-21T15:48:50,278+0000 [send-response-OrderedScheduler-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=PRODUCE, apiVersion=6, clientId=rdkafka, correlationId=166305), request={acks=-1,timeout=30000,partitionSizes=[test-02-0=2891]}, remoteAddress=/192.168.105.134:44070) responseAndRequest content: {responses=[{topic=test-02,partition_responses=[{partition=0,error_code=0,base_offset=15670318,log_append_time=-1,log_start_offset=-1}]}],throttle_time_ms=0}
2022-07-21T15:48:50,309+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7374), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670312,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7375), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7375), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670312,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7375) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670312, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670312 .
2022-07-21T15:48:50,309+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670312 in cache. cache size: 0
2022-07-21T15:48:50,309+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Mark delete success for position: 469:8745
2022-07-21T15:48:50,309+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Topic test-02-0 success read entry: ledgerId: 469, entryId: 8745, size: 2878, ConsumerManager original offset: 15670312, lastEntryPosition: 469:8745, nextOffset: 15670319
2022-07-21T15:48:50,309+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670319
2022-07-21T15:48:50,309+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,309+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7375): read 4 entries but only 4 entries are committed
2022-07-21T15:48:50,310+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7375), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670312,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=15670319,last_stable_offset=15670319,log_start_offset=15670319,aborted_transactions=[]},record_set=[(record=DefaultRecord(offset=15670312, timestamp=1658418530214, key=0 bytes, value=2668 bytes)), (record=DefaultRecord(offset=15670313, timestamp=1658418530214, key=0 bytes, value=2867 bytes)), (record=DefaultRecord(offset=15670314, timestamp=1658418530216, key=0 bytes, value=2576 bytes)), (record=DefaultRecord(offset=15670315, timestamp=1658418521784, key=0 bytes, value=4222 bytes)), (record=DefaultRecord(offset=15670316, timestamp=1658418521784, key=0 bytes, value=4220 bytes)), (record=DefaultRecord(offset=15670317, timestamp=1658418521784, key=0 bytes, value=4146 bytes)), (record=DefaultRecord(offset=15670318, timestamp=1658418530169, key=0 bytes, value=2821 bytes))]}]}]}
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7376), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7376), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670319,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7376) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670319, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670319 .
2022-07-21T15:48:50,310+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670319 in cache. cache size: 0
2022-07-21T15:48:50,310+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670319
2022-07-21T15:48:50,310+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,310+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7376): read 0 entries but only 0 entries are committed
2022-07-21T15:48:50,410+0000 [send-response-OrderedScheduler-1-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - Write kafka cmd to client. request content: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7376), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670319,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254) responseAndRequest content: {throttle_time_ms=0,error_code=0,session_id=0,responses=[{topic=test-02,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=-1,last_stable_offset=-1,log_start_offset=-1,aborted_transactions=null},record_set=[]}]}]}
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - [/192.168.120.47:55254] Received kafka cmd RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7377), the request content is: KafkaHeaderAndRequest(header=RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7377), request={replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=0,epoch=-1,topics=[{topic=test-02,partitions=[{partition=0,fetch_offset=15670319,log_start_offset=-1,max_bytes=1048576}]}],forgotten_topics_data=[]}, remoteAddress=/192.168.120.47:55254)
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=vector, correlationId=7377) Fetch request. Size: 1. Each item:
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler -   Fetch request topic:test-02-0 data:(offset=15670319, logStartOffset=-1, maxBytes=1048576).
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaRequestHandler - using public as tenant
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.MessageFetchContext - Fetch for test-02-0: remove tcm to get cursor for fetch offset: 15670319 .
2022-07-21T15:48:50,411+0000 [pulsar-ph-kafka-64-2] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Get cursor for offset: 15670319 in cache. cache size: 0
2022-07-21T15:48:50,411+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] DEBUG io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager - [[id: 0xf76a13f2, L:/192.168.105.162:9092 - R:/192.168.120.47:55254]] Add cursor back kop-consumer-cursor-persistent://public/default/test-02-partition-0-414-2987-a2bc5bcbd3 for offset: 15670319
BewareMyPower commented 2 years ago

It looks okay from the logs. What's wrong of your application?

BTW, the CoordinatorLoadInProgress (Broker: Coordinator load in progress) is not a permanent error, it's just because the group coordinator (for consumer) or transaction coordinator (for producer) is loading the metadata from the storage topic (__consumer_offsets or __transaction_state).

Demogorgon314 commented 2 years ago

I tried the https://github.com/dpkp/kafka-python client, and it works very well. @jiangtao7 Can you provide the broker config?

jiangtao7 commented 2 years ago

broker.conf.zip

Demogorgon314 commented 2 years ago

Did you use the K8S environment, and used the Istio to expose KoP? If yes, you should configure the SSL certificates in the python client and set the 'security.protocol': 'SSL'.

BewareMyPower commented 2 years ago

@jiangtao7 I cannot find the transaction related configs in your broker.conf.

brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true

Did you miss the comments here: https://github.com/streamnative/kop/issues/1415#issuecomment-1191002775?

BewareMyPower commented 2 years ago

I also see you configured the same advertised listener and listener

kafkaAdvertisedListeners=PLAINTEXT://pulsar-broker-0.pulsar-broker.pulsar-poc.svc.cluster.local:9092
kafkaListeners=PLAINTEXT://pulsar-broker-0.pulsar-broker.pulsar-poc.svc.cluster.local:9092

The kafkaAdvertisedListeners should be like the advertised.listeners while the kafkaListeners should be like the listeners config in Kafka. It means you should configure an external address for kafkaAdvertisedListeners and an internal address for kafkaListeners.

See https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic.

sumeet-zuora commented 2 years ago

We did set to above, attaching the entire manifest and values.yaml, we are currently using pulsar in same K8s cluster, and it still fails to connect. Istio is disabled for this namespace

values.yaml.txt

sumeet-zuora commented 2 years ago

manifest

manifests.yaml.txt

Demogorgon314 commented 2 years ago
# Istio is required for exposing Kafka service
istio:
  enable: true
  gateway:
    selector:
      istio: "ingress"

I'm not familiar with K8S, but as I know, the Istio is required in K8S env when using the KoP. Can you try to enable the Istio and configure the SSL certificates in the python client?

BewareMyPower commented 2 years ago

I see your config (https://github.com/streamnative/kop/issues/1415#issuecomment-1204129891) is

    messagingProtocols: "kafka"
    protocolHandlerDirectory: "./protocols"
    kafkaListeners: "PLAINTEXT://127.0.0.1:9092"
    allowAutoTopicCreationType: "partitioned"
    brokerDeleteInactiveTopicsEnabled: "false"
    brokerDeduplicationEnabled: "true"
    kafkaTransactionCoordinatorEnabled: "true"

You didn't expose the kafkaAdvertisedListeners.

BTW, I found it is not the same with the previous config (https://github.com/streamnative/kop/issues/1415#issuecomment-1192830410)

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor,org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
kafkaAdvertisedListeners=PLAINTEXT://pulsar-broker-0.pulsar-broker.pulsar-poc.svc.cluster.local:9092
kafkaListeners=PLAINTEXT://pulsar-broker-0.pulsar-broker.pulsar-poc.svc.cluster.local:9092
messagingProtocols=kafka

I'm confused about whether do you use the same config? @Schaudhari7565 @jiangtao7

BewareMyPower commented 2 years ago

In addition, is this issue still only for Python client? Are you still able to connect your cluster via Go client or Java CLI (kafka-console-consumer.sh)? If yes, it would be better to upload your full broker logs (please attach file instead of pasting).

sumeet-zuora commented 2 years ago

@BewareMyPower and @Demogorgon314 we are using same Manifest for deployment, if I try to enable kop with tls and istio based on https://github.com/streamnative/charts/blob/master/examples/pulsar/values-kop-tls-istio.yaml

got Error in manager, we are using version 2.10.1.3

java.lang.IllegalArgumentException: Invalid argument syntax: --tls.keystore.password=
    at org.springframework.core.env.SimpleCommandLineArgsParser.parse(SimpleCommandLineArgsParser.java:75)
    at org.springframework.core.env.SimpleCommandLinePropertySource.<init>(SimpleCommandLinePropertySource.java:90)
    at org.springframework.boot.DefaultApplicationArguments$Source.<init>(DefaultApplicationArguments.jav
Demogorgon314 commented 2 years ago

@tuteng Can you help take look at this issue?

tuteng commented 2 years ago

@Schaudhari7565 Can you try check this value? Values.tls.pulsar_manager.enabled

sumeet-zuora commented 2 years ago

it is disabled/false @tuteng

tuteng commented 2 years ago

it is disabled/false @tuteng

Can you try set it to true @Schaudhari7565