Open omerAllied opened 6 years ago
Don't know if this is relevant to your issue as I don't know the arguments used in the avro console producer. What I know is that data AVRO generated with the schema registry differs from the data generated by just using the schema. The use of the schema registry makes that the confluent libraries add an extra envelope ( magic byte + schema Id ). So their working assumption in Confluent is that you use the registry from publisher and from subscriber side and that the registry is the same at both sides, otherwise the id's don't match. To my feeling a wrong design.
Hi @vortex314
I think the issue here is not lacking an understanding of the Confluent Wire Format, and the source code for the Avro Console producer parses JSON records, then applies an AVSC onto them, and encodes it.
From reading the ruby code in this project, it seems to be doing roughly the same thing as the other https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageReader.java
Im trying to publish a JSON input to a kafka message in Logstash (in essence trying to replicate the behavior of the kafka-avro-console-producer).
The message gets added in the queue but is different from the one produced by the kafka-avro-console-producer
Message produced by kafka-avro-console-producer { "topic": "BellData7", "key": "ée", "value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR", "partition": 0, "offset": 1 },
Message produced by logstash (with the configuration below): { "topic": "BellData7", "key": "ée", "value": "\u0000\u0000\u0000\u0000\u0001\nBMORR\fCELTRR", "partition": 0, "offset": 1 },
I can see the magic digits are clearly missing and the data appears to be encoded differently.... what am I doing wrong
The logstash conf file:
input { stdin { } }
filter{ json { source => message remove_field => [ "path","@timestamp","@version","host", "message" ] remove_tag => ["message"] }
}
output { kafka { bootstrap_servers => "172.17.0.3:9092" codec => avro_schema_registry { endpoint => "http://172.17.0.3:8081" subject_name => "MyTopic7-value" schema_uri => "/usr/share/logstash/pipeline/schema.avsc" register_schema => true binary_encoded => true }
topic_id => "MyTopic7" } }
I enter the following input in the stdin
{"Customer": "BMORR", "Device": "logstash"}
Schema.avsc is as below:
{ "namespace": "hello.avro", "type": "record", "name": "Hello", "fields": [ {"name": "Customer", "type": "string"}, {"name": "Device", "type": "string"} ] }