tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

How to delete records from Kafka table #9

Closed vijaymandave closed 1 year ago

vijaymandave commented 2 years ago

Hi,

How to delete records from Kafka table using Kafka.DotNet.ksqlDB?

Is there any documentation to refer for sample implementation?

Please suggest.

tomasfabian commented 2 years ago

Hi @vijaymandave, it is not supported in ksqldb https://github.com/confluentinc/ksql/issues/7073

Here is a workaround

Tomas

tomasfabian commented 2 years ago

@vijaymandave meanwhile you can also try to create a tombstone value by producing a message into your underlying Kafka topic directly IMO:

Install-Package Kafka.DotNet.InsideOut
using System.Threading.Tasks;
using Confluent.Kafka;
using Kafka.DotNet.InsideOut.Producer;
public async Task CreateTombstoneAsync(int id)
{      
  string bootstrapServers = "localhost:29092";

  var producerConfig = new ProducerConfig
  {
    BootstrapServers = bootstrapServers,
    Acks = Acks.Leader
  };

  using var producer = new KafkaProducer<int, Model>(topicName: "your_topic", producerConfig); 

  var deliveryResult = await producer.DeleteMessageAsync(id);
}
vijaymandave commented 2 years ago

Hello @tomasfabian ,

Above solution is not working, I think tombstone entries are not created for the topic.

As per documentation - https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream/ :

NULL message values are treated as a tombstone. Any existing row with a matching key is deleted.

tomasfabian commented 2 years ago

@vijaymandave which part doesn't work? Insertion of NULL messages (tombstones) or the deletion of rows with a matching key?

UPDATE: For the former check the delivery result or examine the underlying topic directly. It should contain a record with null value for the provided key. END IF UPDATE

Could be the latter caused by the fact that it will be deleted eventually (not immediately)? AFAIK ksqldb Tables are compacted Kafka topics, so it can depend on compaction configs, too. These are some thoughts from me, you will have to figure it out probably yourself in your environment.

Hopefully some related/helpful articles: https://kafka.apache.org/documentation/#compaction

Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition.

Log compaction guarantees Configuring log cleaner

Let us know your solution if you finally solve it, please.

Regards Tomas

vijaymandave commented 2 years ago

Hi @tomasfabian,

Insertion of NULL is working using KSQL CLI, but deletion of rows with a matching key using above C# code is not working and getting below error.

Message = "Local: Message timed out"
Source = "Confluent.Kafka"
StackTrace = "   at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, Message`2 message, CancellationToken cancellationToken)\r\n   at Kafka.DotNet.InsideOut.Producer.KafkaProducer`2.ProduceMessageAsync(TKey key, TValue value, CancellationToken cancellationToken)\r\n   at Kafka.DotNet.InsideOut.Producer.KafkaProducer`2.DeleteMessageAsync(TKey id)\r\n   at TestConsoleApp.Program.Delete[T](String id, String topicName) in D:\\TestConsoleApp\\TestConsoleApp\\Program.cs:line 130"

I have set retention policy at server level(1 hour) and message level (5 minutes) as below. Server Config:

log.retention.ms=3600000
log.segment.bytes=1073741824
log.retention.check.interval.ms=3600000
log.cleaner.enable=true

Message Config:

retention.ms=300000
cleanup.policy=delete 
delete.retention.ms=300000
segment.bytes=10000
segment.ms=300000

When Kafka messages are deleted, it is not deleting using above configuration. Kafka deletes messages eventually, means it deletes messages after 2 days or 7days which is not fixed and then on what conditions Kafka deleting messages?

I am using Kafka with 500GB hard disk space, should I concern to clean up Kafka messages over long run, so that system not exhausted due to hard disk space not available?

Please suggest.

Thanks, Vijay

tomasfabian commented 2 years ago

Hi @vijaymandave I would like to stress out that the above-mentioned examples are workarounds. We must wait for a proper solution in ksqldb.

Try add the following config for more detailed kafka logs:

producerConfig.Debug += ",broker,topic,msg";

Deletion of your messages depends on how valuable they are. Kafka compacted topics are not deleted AFAIK, only compacted to at least one value per key. The default cleanup policy for them is:

cleanup.policy=compact

In order to get time based or size based retention you should try to set the cleanup policy in this way:

cleanup.policy=compact, delete

Please look at these topic configs, too:

min.cleanable.dirty.ratio
min.compaction.lag.ms

Tomas

tomasfabian commented 2 years ago

... and as always rather verify your topic config somehow (e.g.):

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

It looks like that there was a breaking change:

ksqlDB now creates windowed tables with cleanup policy compact,delete, rather than compact. Also, topics that back streams are always created with cleanup policy delete, rather than the broker default (by default, delete).