logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@metadata][kafka][tombstone]=true #344

Open slaout opened 1 month ago

slaout commented 1 month ago

Logstash information:

  1. Logstash version: 8.13.0
  2. Logstash installation source: "FROM docker.elastic.co/logstash/logstash:8.13.0"
  3. How is Logstash being run: docker/kubernetes deployment
  4. How was the Logstash Plugin installed: it's already present in the Docker image

JVM

Does not seem to matter here: we use the default's JVM provided by the Docker image.

OS version

Same here.

Description of the problem including expected versus actual behavior:

When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline. Kafka tombstones are events with a null value. They are used to signal the deletion of the entity behind the key of the message. Here, we use an Avro-schema for the topic: this may be the source of the problem. See an example pipeline below.

This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.

It seems like the problem lies in this file: https://github.com/logstash-plugins/logstash-input-kafka/blob/main/lib/logstash/inputs/kafka.rb#L258

This could be here to trigger the tombstone handling: Giving an empty log with one field: [@metadata][kafka][tombstone]=true

Steps to reproduce:

  1. Publish an event with a key, and a null value into a Kafka topic that is provided as the input of a Logstash pipeline
  2. Use stdout output and see that nothing is printed for such event
  3. Use a filter to add some metadata when a mandatory field is not present (likely a tombstone message) and see that nothing is printed for such event

Provide logs (if relevant):

Not relevant, but here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:

input {
  kafka {
    bootstrap_servers => ["${KAFKA_BROKER_URL}"]
    topics => ["${TOPIC}"]
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "${SASL_JAAS_CONFIG}"
    ssl_endpoint_identification_algorithm => "https"
    group_id => "${GROUP_ID}"
    auto_offset_reset => "${AUTO_OFFSET_RESET}"
    isolation_level => "read_committed"
    schema_registry_url => "${KAFKA_SCHEMA_REGISTRY_URL}"
    schema_registry_key => "${KAFKA_SCHEMA_REGISTRY_USERNAME}"
    schema_registry_secret => "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
    decorate_events => "basic"
  }
}

# Here, it's already too late: the event is not processed, so there is nothing we can filter on...
# filter {
# }

output {
  # Here, we'd like to do this:
  # if [@metadata][kafka][tombstone] {
  #   elasticsearch {
  #     action => "delete"
  #     ...
  #   }
  # } else {
  #   elasticsearch {
  #     action => "index"
  #     ...
  #   }
  # }

  stdout {
    codec => rubydebug { metadata => true }
  }
}