hoppity / kafka-http-proxy

A proxy that allows applications to use Kafka via HTTP written in NodeJS
Apache License 2.0
3 stars 4 forks source link

Deserialization issue in java consuming json messages #42

Closed imanzano closed 7 years ago

imanzano commented 7 years ago

Hi,

Im trying to consume the message produced thought kafka-http-proxy in kafka and the message that arrives to my java consumer is a kind of corrupted.

I do the following

publish a message using curl, as the wiki example

curl -X POST -H "Content-type: application/kafka.binary.v1+json" --data '{"records": [ { "value": "something"} ]}' http://localhost:8085/topics/topic

and I receive the following using the Java Kafka Consumer example from https://www.tutorialspoint.com/apache_kafka/apache_kafka_simple_producer_example.htm

offset = 7, key = , value = ��^

In the Java Kafka consumer I tried change the deserialization from String to Bytes ..bu It didn't work.

any idea what Im doing wrong ? I just want to produce message using rest and consume it using Java.

I receive the same using $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic

This is the Java Kafka Consumer


public class SimpleConsumer {
    private SimpleConsumer() {
    }

    /**
     */
    public static void main(String[] args) throws Exception {
        if(args.length == 0){
            System.out.println("Enter topic name");
            return;
        }
        //Kafka consumer configuration settings
        final String topicName = args[0];
        final Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:32769");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //noinspection DuplicateStringLiteralInspection
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //noinspection DuplicateStringLiteralInspection
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

        //Kafka Consumer subscribes list of topics here.
        consumer.subscribe(singletonList(topicName));

        //print the topic name
        System.out.println("Subscribed to topic " + topicName);
        int i = 0;

        while (true) {
            final ConsumerRecords<String, byte[]> records = consumer.poll(100);
            for (final ConsumerRecord<String, byte[]> record : records)

                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), new String(record.key(), new String(record.value())));
        }
    }
}

Best regards, Ignacio

ducas commented 7 years ago

Hi Ignacio. Have you tried UTF8 encoding the value before publishing?

imanzano commented 7 years ago

The publish is made by the curl command, now I set the charset to UTF8 and still the same

curl -X POST -H "Content-type: application/kafka.binary.v1+json; charset=UTF-8" --data '{"records": [ { "value": "pepe"} ]}' http://localhost:8085/topics/topic

ducas commented 7 years ago

Hi. I mean, have you tried changing "pepe" to its UTF8 encoded value in your publisher? IIRC the proxy is just pumping it straight into the Kafka-node library which just throws bytes into a Kafka message. On Wed, 8 Feb 2017 at 23:11, Ignacio Manzano notifications@github.com wrote:

The publish is made by the curl command, now I set the charset to UTF8 and still the same

curl -X POST -H "Content-type: application/kafka.binary.v1+json; charset=UTF-8" --data '{"records": [ { "value": "pepe"} ]}' http://localhost:8085/topics/topic

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/hoppity/kafka-http-proxy/issues/42#issuecomment-278311897, or mute the thread https://github.com/notifications/unsubscribe-auth/AALysDKuTzEEXcenFTaUAkfDTX6tyLF8ks5rabDWgaJpZM4L6Dul .

-- Cheers, Ducas

imanzano commented 7 years ago

Hi,

Also I tried using --data-ascii and the same result.

curl -X POST -H "Content-type: application/kafka.binary.v1+json; charset=UTF-8" --data-ascii '{"records": [ { "value": "pepe"} ]}' http://localhost:8085/topics/topic

Can be possible that the docker instance is using a different encoding and that's why the nodejs server write the wrong values in kafka ? Do you know how can I set the encoding in the nodejs server ?

Also I tried with the kafka default consumer and it shows the same.

imanzano commented 7 years ago

Here more information.

I create a consumer using kafka-http-proxy

curl -X POST http://localhost:8085/consumers/test-group

Then I publish a new message using the API

curl -X POST -H "Content-type: application/kafka.binary.v1+json; charset=UTF-8" --data '{"records": [ { "value": "pepe"} ]}' http://localhost:8085/topics/topic

And the last step I consume that message using the kafka-http-proxy

curl http://localhost:8085/consumers/test-group/instances/9b28910b-a2ab-4e7d-ba24-857a9cabad75/topics/topic

and I got

[{"topic":"topic","partition":3,"offset":7,"key":"","value":"77+977+9Xg=="}]

the value is encoded in base64, but if you convert to base64 77+977+9Xg== is ��^ and not "pepe"

In this example I just use as producer and consumer the kafka-http-proxy.

imanzano commented 7 years ago

I found the issue, the example in the wiki page is wrong.

the PUT operation has to have the value encoded in base64, If I encode the value in base64 the message arrive successfully

For example

curl -X POST -H 'Content-Type: application/kafka.binary.v1+json' -d '{"records":[{"value":"aG9sYQ=="}]}' http://localhost:8085/topics/topic

I found it debugging the topics.js file and activating the logs, so when a POST is made the following message is logged.

[2017-02-08T17:25:16.863Z] DEBUG: kafka-http-proxy/206 on 8a80ab3dbec2(/usr/local/src/controllers/topics.js:90): ready to send the data (data=4) messages: [ { "topic": "topic", "messages": [ "�m�" ] } ]

So, all values in a POST have to be in base64 or the topics.js should ont try to decode the value because is not enconded in base64

Thanks Ignacio