aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

Add delete_records to the admin client #967

Closed vmaurin closed 5 months ago

vmaurin commented 5 months ago

Describe the solution you'd like

We should have a method on the admin client to delete records : API 21 https://kafka.apache.org/protocol#The_Messages_DeleteRecords

It is useful when doing stream processing and have some logic to cleanup repartitions/shuffle topics

Additional context

It was added a while ago to the java client, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

vmaurin commented 5 months ago

@ods I gave it a shot, but I can't get it working, I am a bit lost when it comes to debug protocol issue (it seems it is what I am facing here). If you have few tips, I would be happy to rework my copy

https://github.com/aio-libs/aiokafka/pull/969

ods commented 5 months ago

@vmaurin I checked your PR works with minimal modifications for v0 and v1, but fails with v2. It looks like something is wrong with tagged fields support in aiokafka. As a first step we could add it without v2 for now and keep debugging problems with tagged fields.

ods commented 5 months ago

For the record, the error in log when sending DeleteRecordsRequest_v2 is:

[2024-01-29 15:53:59,551] ERROR Exception while processing request from 172.17.0.2:57466-192.168.65.1:21560-1 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 21
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:155)
        at java.nio.ByteBuffer.get(ByteBuffer.java:723)
        at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
        at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:52)
        at org.apache.kafka.common.message.RequestHeaderData.read(RequestHeaderData.java:135)
        at org.apache.kafka.common.message.RequestHeaderData.<init>(RequestHeaderData.java:84)
        at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:95)
        at kafka.network.Processor.parseRequestHeader(SocketServer.scala:999)
        at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1012)
        at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
        at kafka.network.Processor.run(SocketServer.scala:893)
        at java.lang.Thread.run(Thread.java:750)
vmaurin commented 5 months ago

@ods Good catch. I copy pasted the TaggedFields from others definition, but it seems not so usable. In the Java version, they just mention "flexibleVersion" and then it seems to use different serializer

It doesn't seem to work without declaring TaggedFields too

ods commented 5 months ago

What I've found so far: although aiokafka declares v1 headers, it doesn't actually use it. After fixing request header broker doesn't complain anymore. But I'm still struggling to fix the parsing response part.