mtth / avsc

Avro for JavaScript :zap:
MIT License
1.29k stars 148 forks source link

Unable to consume messages produced by Java application with AVRO schema #453

Closed tommy38hk closed 8 months ago

tommy38hk commented 9 months ago

I am trying to consume a Kafka message produced by a Java application with schema definition using avro.

const { Kafka } = require('kafkajs'); const axios = require('axios'); const avroRegistry = require('avro-schema-registry'); const avro = require('avsc'); const { Partitioners } = require('kafkajs');

...

await consumer.subscribe({ topic, fromBeginning: true });

async function handleMessage(message) {
  try {
    const messageAsString = JSON.stringify(message);
    const messageObject = JSON.parse(messageAsString);
    const avroMessage = messageObject.message.value;

    console.log('messageObject:', messageObject);
    console.log('avroMessage:', avroMessage);
    const buffer = Buffer.from(avroMessage.data);
    console.log('buffer:', buffer);

    // Use Avro to decode the buffer
    //let type = avro.Type.forSchema(yourAvroSchema);
    let decoded = type.fromBuffer(buffer);
  } catch (error) {
    if (error instanceof avro.AvroError) {
      // Handle specific Avro decoding errors
      console.error('Avro decoding error:', error.message);
    } else {
      console.error('Unexpected error decoding message:', error);
    }
  }
}

await consumer.connect();
await consumer.run({
  eachMessage: handleMessage,
});

I can only access the "avroMessage" variable but not able to decode the Buffer.

I got the following error

{"level":"ERROR","timestamp":"2024-02-25T20:04:57.433Z","logger":"kafkage":"[Runner] Error when calling eachMessage","topic":"my-toipc","partition":0,"offset":"98","stack":"TypeError: Rside of 'instanceof' is not an object\n at Runner.handleMessage [asge] Note that the same message can be consumed in Java. And Java is using org.apache.avro.generic.GenericData.Record when producing the message.

mtth commented 8 months ago

This is typically caused by invalid data fed to avsc (encoding mismatch, data prefix, ...). It's hard to say more without data to reproduce it.

tommy38hk commented 8 months ago

I created a sample schema and message that produced the same error.

{
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "name": "name",
      "type": [
        "null",
        {
          "avro.java.string": "String",
          "type": "string"
        }
      ]
    },
    {
      "name": "email",
      "type": [
        "null",
        {
          "avro.java.string": "String",
          "type": "string"
        }
      ]
    },
    {
      "name": "timestamp",
      "type": [
        "null",
        "long"
      ]
    },
    {
      "name": "items",
      "type": [
        "null",
        {
          "items": {
            "fields": [
              {
                "name": "name",
                "type": [
                  "null",
                  {
                    "avro.java.string": "String",
                    "type": "string"
                  }
                ]
              },
              {
                "name": "price",
                "type": [
                  "null",
                  "double"
                ]
              }
            ],
            "name": "Item",
            "type": "record"
          },
          "type": "array"
        }
      ]
    }
  ],
  "name": "User",
  "namespace": "org.example.test.testkafka",
  "type": "record"
}

And a Java Kafka producer

package org.example.test.testkafka;// Import libraries

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Service

public class KafkaProducerExample {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.client-id}")
    private String clientId;

    @Value("${kafka.username}")
    private String username;

    @Value("${kafka.password}")
    private String password;

    @Value("${kafka.schema-registry-url}")
    String schemaRegistryUrl;
    @Value("${kafka.schema-registry-user-info}")
    String schemaRegistryUserInfo;

    public void test(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put("basic.auth.user.info", schemaRegistryUserInfo);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("schema.registry.url", schemaRegistryUrl);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

        try (Producer<String, GenericRecord> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
            // Create a User record
            User user = new User();
            user.setId(1L);
            user.setName("John Doe");
            user.setEmail("john.doe@example.com");
            user.setTimestamp(System.currentTimeMillis());

            // Create an Item record
            Item item = new Item();
            item.setName("Product A");
            item.setPrice(19.99);

            user.setItems(Collections.singletonList(item));

            // Produce the User record to Kafka
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("example", null, user);
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
{
  "id": 1,
  "name": "John Doe",
  "email": "john.doe@example.com",
  "timestamp": 1709667230821,
  "items": [
    {
      "name": "Product A",
      "price": 19.99
    }
  ]
}
mtth commented 8 months ago

The producer code doesn't have enough information; we need to see what data avsc tries to decode. Can you share the value of message in your example?

tommy38hk commented 8 months ago

I had already shared, I am reposting again here

{
  "id": 1,
  "name": "John Doe",
  "email": "john.doe@example.com",
  "timestamp": 1709667230821,
  "items": [
    {
      "name": "Product A",
      "price": 19.99
    }
  ]
}
mtth commented 8 months ago

That doesn't look like message, it doesn't have the right fields:

async function handleMessage(message) { // We need this message
  try {
    const messageAsString = JSON.stringify(message);
    const messageObject = JSON.parse(messageAsString);
    const avroMessage = messageObject.message.value;
    const buffer = Buffer.from(avroMessage.data); // So we can see the buffer avsc sees
    let type = avro.Type.forSchema(yourAvroSchema);
    let decoded = type.fromBuffer(buffer);
// ...
tommy38hk commented 8 months ago

I added some console.log in between as follows:-

        const messageAsString = JSON.stringify(message);
        const messageObject = JSON.parse(messageAsString);

        console.log('messageObject:', messageObject);
        const bufferData = Buffer.from(messageObject.message.value.data);

        const hexString = bufferData.toString('hex');
        const formattedString = hexString.match(/../g).join(' ');
        const decodedString = formattedString.toString("utf-8");
        const buffer2 = Buffer.from(decodedString)
        const decodedString2 = buffer2.toString("utf-8");
        console.log(decodedString2); 
        let decoded = type.fromBuffer(bufferData);

And the output is

node consume-schema-topic.js
messageObject: {
  topic: 'example',
  partition: 0,
  message: {
    magicByte: 2,
    attributes: 0,
    timestamp: '1711541805856',
    offset: '0',
    key: null,
    value: { type: 'Buffer', data: [Array] },
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '0',
      firstTimestamp: '1711541805856',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 0,
      producerId: '95946902',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1711541805856',
      timestampType: 0,
      magicByte: 2
    }
  }
}
00 00 01 87 1b 02 02 02 10 4a 6f 68 6e 20 44 6f 65 02 28 6a 6f 68 6e 2e 64 6f 65 40 65 78 61 6d 70 6c 65 2e 63 6f 6d 02 d6 fa ba fd cf 63 02 02 02 12 50 72 6f 64 75 63 74 20 41 02 3d 0a d7 a3 70 fd 33 40 00
{"level":"ERROR","timestamp":"2024-03-27T12:19:23.782Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"example","partition":0,"offset":"0","stack":"TypeError: Right-hand side of 'instanceof' is not an object\n    at Runner.handleMessage [as eachMessage] (C:\\Users\\myuserid\\testkafka\\consume-schema-topic.js:166:13)\n    at Runner.processEachMessage (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:231:20)\nat onBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:447:20)\n    at Runner.handleBatch (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:461:11)\n    at handler (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\runner.js:58:30)\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\worker.js:29:15\n    at Object.run (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\utils\\sharedPromiseTo.js:14:17)\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:38\n    at Array.forEach (<anonymous>)\n    at Object.push (C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\consumer\\workerQueue.js:27:13)","error":{}}{"level":"ERROR","timestamp":"2024-03-27T12:19:28.065Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object","groupId":"microservice1-consumer-group","stack":"KafkaJSNonRetriableError: Right-hand side of 'instanceof' is not an object\n    at C:\\Users\\myuserid\\testkafka\\node_modules\\kafkajs\\src\\retry\\index.js:55:18\nat process.processTicksAndRejections (node:internal/process/task_queues:95:5)"}{"level":"INFO","timestamp":"2024-03-27T12:19:28.135Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"microservice1-consumer-group"}
C:\Users\myuserid\testkafka>
mtth commented 8 months ago

Thanks, that's helpful. It looks like the system you are using is adding a prefix to the data, you'll need to strip it before decoding. In this example, it's 5 bytes:

> b = Buffer.from(/* above data: 000001871b... */, 'hex')
> t = avro.Type.forSchema(/* above schema */)
> t.fromBuffer(b.subarray(5))
User {
  id: 1,
  name: 'John Doe',
  email: 'john.doe@example.com',
  timestamp: 1711541804715,
  items: [ Item { name: 'Product A', price: 19.99 } ]
}
tommy38hk commented 8 months ago

Thanks @mtth. This is working as per suggestion. Just that I am not sure if it is always starting from 5.

mtth commented 8 months ago

I'm afraid this is out of scope of avsc. You'll need to dig into the enclosing system setup to figure out the prefix' properties.