streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
450 stars 136 forks source link

[BUG] `DeleteRecords` returns "Unknown Topic or Partition" for existing partition #1286

Open crepererum opened 2 years ago

crepererum commented 2 years ago

Describe the bug KOP answers DeleteRecords requests with "Unknown Topic or Partition" even when the partition clearly exists.

To Reproduce When using KOP based on the following Dockerfile:

FROM apachepulsar/pulsar:2.10.0

ENV CURL_FLAGS="--proto =https --tlsv1.2 -sSf" \
    KOP_VERSION="2.10.0.3" \
    KOP_SHA256="fa93f1fea1a1c3bd2103873a02cb291dd1f4e2239796b2df9856d8530767b094"

RUN env HOME=/pulsar curl ${CURL_FLAGS} -LO "https://github.com/streamnative/kop/releases/download/v${KOP_VERSION}/pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" && \
    echo "${KOP_SHA256}  pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" | sha256sum -c && \
    mkdir protocols && \
    mv "pulsar-protocol-handler-kafka-${KOP_VERSION}.nar" protocols && \
    cp conf/standalone.conf tmp.conf && \
    echo "messagingProtocols=kafka" >> tmp.conf && \
    echo "protocolHandlerDirectory=./protocols" >> tmp.conf && \
    echo "kafkaListeners=PLAINTEXT://0.0.0.0:9092" >> tmp.conf && \
    echo "kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092" >> tmp.conf && \
    echo "brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" >> tmp.conf && \
    echo "brokerDeleteInactiveTopicsEnabled=false" >> tmp.conf && \
    echo "kafkaTransactionCoordinatorEnabled=true" >> tmp.conf

CMD ["bash", "-c", "cp tmp.conf conf/standalone.conf && bin/pulsar standalone"]

and the following preparation:

  1. create topic with 1 partition
  2. issue 3 produce requests:
    1. single record
    2. two records, remember the offset of the first (call it o)
    3. single record
  3. Issue a DeleteRecords request with offset o

Then I would expect the response to be OK (at least this works with "official" Kafka implementation). However the response is "Unknown Topic or Partition".

Here is a pcap dump of the network traffic: kop-delete-bug.zip

A codified version of that test can be found here:

https://github.com/influxdata/rskafka/blob/5d4245c219b500c414cfdf4dddb76bfd167e5bd2/tests/client.rs#L400-L500

Expected behavior Deletion should work.

Additional context The dumps were produces by the rskafka integration tests.

BewareMyPower commented 2 years ago

KoP doesn't support this request because the backed Pulsar doesn't support deleting a message as well.

crepererum commented 2 years ago

That's reasonable. But then why is the feature advertised in ApiVersions? To reference some "prior art": redpanda also does NOT support deletes, but clearly communicates that to the client by excluding this request type from ApiVersions.

BewareMyPower commented 2 years ago

Good point. I think we should also adopt the same solution in KoP.

BewareMyPower commented 2 years ago

I'm sorry I found DeleteRecords request has already been supported based on the truncateAsync API in https://github.com/streamnative/kop/pull/871. I will take a look at this issue later.

BewareMyPower commented 2 years ago

The truncateAsync API cannot achieve the same goal of Kafka's deleteRecords method, the implementation of #871 was added only to make Kafka Streams work.

I think we still need to implement this API in future. For now, maybe we just need a better error response.

eolivelli commented 2 years ago

as @BewareMyPower pointed out, I had implemented deleteRecords with truncate. But this actually works for truncating the whole topic. This is the usecase required by KStreams and KSQLDB and it works quite well. Currently in Pulsar it is not possible to "delete some records".

Raven888888 commented 2 years ago

@crepererum I found a workaround to remove some data in kop, using topic/namespace level retention of pulsar.

My use case is: I have a corrupt data in the middle of a kop topic, this breaks my kafka consumer client. I am using pulsar and kop 2.10

Since I am able to consume all the records prior to the corrupt one, I just need to set retention such that data before and including the corrupt data are removed from pulsar. And my client is able to consume data from kop again.

pulsar-admin namespaces set-retention public/default pulsar-admin topics set-retention public/default/mytopic

The catch is:

Hope this helps.

crepererum commented 2 years ago

It does not delete just a particular record, but delete all records up to the particular one.

That's the semantics of the DeleteRecords Kafka API. The name is a bit misleading. It's technically a "trim" rather than a delete at a single offset.

eolivelli commented 2 years ago

We have to implement that in Pulsar, then we can add it to KOP