logstash-plugins / logstash-input-kafka

Kafka input for Logstash
Apache License 2.0
139 stars 122 forks source link

the `record.value.to_s` method prevent codec plain charset settings #337

Closed Xztty closed 3 years ago

Xztty commented 3 years ago

https://github.com/logstash-plugins/logstash-input-kafka/blob/66e6a26ce97b29beb03b4196316eb84852887d52/lib/logstash/inputs/kafka.rb#L258

Xztty commented 3 years ago

when kafka message‘s value isn't UTF-8 encoding, for example GBK encoding message. I'd like to set value_deserializer_class => "com.xxx.Latin1StringDeserializer"

The Latin1StringDeserializer implementation:

    public String deserialize(String s, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, "ISO-8859-1");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding");
        }
    }

I have return a value encoded with "ISO-8859-1" and I want to set input configuration as:

input {
    kafka {
        id => "test_tgk"
        bootstrap_servers => "192.168.1.1:9092"
        topics => ["test_gbk"]
        group_id => "test-gbk"
        value_deserializer_class => "com.xxx.Latin1StringDeserializer"
        codec => plain {
            charset => "GBK"
        }
    }
}

But it won't work, because codec_instance.decode(record.value.to_s) do |event|, the codec decode from the message value string representation instead of value's byte array representation

Xztty commented 3 years ago

It's resolved by this configuration

input {
    kafka {
        id => "test_gbk"
        bootstrap_servers => "192.168.1.1:9092"
        topics => ["test_gbk_"]
        group_id => "cg-gbk"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        codec => plain {
            charset => "GBK"
        }
    }
}