This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Decode/encode Avro records as Logstash events using the associated Avro schema from a Confluent schema registry. (https://github.com/confluentinc/schema-registry)
logstash-plugin install logstash-codec-avro_schema_registry
When this codec is used to decode the input, you may pass the following options:
endpoint
- always required.username
- optional.password
- optional.tag_on_failure
- tag events with _avroparsefailure
when decode failsIf the input stream is binary encoded, you should use the ByteArrayDeserializer
in the Kafka input config.
This codec uses the Confluent schema registry to register a schema and encode the data in Avro using schema_id lookups.
When this codec is used to encode, you may pass the following options:
endpoint
- always required.username
- optional.password
- optional.schema_id
- when provided, no other options are required.subject_name
- required when there is no schema_id
.schema_version
- when provided, the schema will be looked up in the registry.schema_uri
- when provided, JSON schema is loaded from URL or file.schema_string
- required when there is no schema_id
, schema_version
or schema_uri
check_compatibility
- will check schema compatibility before encoding.register_schema
- will register the JSON schema if it does not exist.binary_encoded
- will output the encoded event as a ByteArray.
Requires the ByteArraySerializer
to be set in the Kafka output config.base64_encoded
- will output the encoded event as a base64 string.client_certificate
- Client TLS certificate for mutual TLSclient_key
- Client TLS key for mutual TLSca_certificate
- CA Certificateverify_mode
- SSL Verify modes. Valid options are verify_none
, verify_peer
, verify_client_once
, and verify_fail_if_no_peer_cert
. Default is verify_peer
input {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
...
}
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
subject_name => "my_kafka_subject_name"
schema_uri => "/app/my_kafka_subject.avsc"
register_schema => true
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
output {
kafka {
...
codec => avro_schema_registry {
endpoint => "http://schemas.example.com"
schema_id => 47
client_key => "./client.key"
client_certificate => "./client.crt"
ca_certificate => "./ca.pem"
verify_mode => "verify_peer"
}
value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}