authorjapps / zerocode

A community-developed, free, opensource, automated testing framework for microservices APIs, Kafka(Data Streams) and Load testing. Zerocode Open Source enables you to create, change and maintain your automated test scenarios via simple JSON or YAML files. Visit documentation below:
https://zerocode-tdd.tddfy.com
Apache License 2.0
896 stars 400 forks source link

Send AVRO content to kafka #341

Open deblockt opened 4 years ago

deblockt commented 4 years ago

Hi,

We try to send kafka message using AVRO and schema registry.

Right now, we've been trying to do something like this:

on configuration

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

on test

{
      "name":"send avro message",
      "url":"kafka-topic:command",
      "operation":"produce",
      "request": {
        "recordType" : "JSON",
        "records": [
          {
            "key": "test",
            "value": {
              "property" : "001"
            }
          }
        ]
      },
      "verify": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
      }
    }

ZeroCode serialize this code using avro and send it. But it doesn't use the existing schema for the object. On the schema registry, we have this schema:

{
  "type" : "record",
  "name" : "TestKafkaMessage",
  "namespace" : "com.foo.bar",
  "fields" : [ {
    "name" : "property",
    "type" : "string"
  }]
}

Instead of use the existing schema, it create a new one command-value with the content :

"string"

Expected Behaviour:

Be able to specify a schema (using file, json, or schema registry reference) to serialize json using this schema.

Is this already possible? Is there any other way?

Thanks

authorjapps commented 4 years ago

@deblockt , apologies! we were slightly busy with other priority tickets and our day jobs. That's why couldn't address this issue. This looks like a feature request(or question?).

We will have a look soon!

anues28 commented 4 years ago

looking for the answer as well

rpapesch commented 3 years ago

Hi!

Currently, we are evaluating the framework to be used in a corporate context.

I would like to know if producing AVRO messages is working (with schema registry).

Is this already fixed and if yes, could you provide an example?

Thank you!

René

M3lkior commented 3 years ago

I have the same needs of all of you and i use a Custom Kafka Client to achieve that !

@rpapesch if my workaround in interesting you :

Here is an extract of my Custom client (not cleaned) :


public class KafkaProduceCostingRequestAvro extends BasicKafkaClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProduceCostingRequestAvro.class);
    // unfortunately not accessible from BasicKafkaClient
    private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
    // unfortunately not accessible from BasicKafkaClient
    private final Gson gson = new GsonSerDeProvider().get();
    // unfortunately not accessible from BasicKafkaClient
    @Inject(optional = true)
    @Named("kafka.producer.properties")
    private String producerPropertyFile;

    @Override
    public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
        // just create an avro producer, returning your avro java pojo
        Producer<String, CostingRequestPayload> producer = createAvroProducer(brokers, producerPropertyFile);
        try {
            // read the actual zerocode configuration
            ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);
            List<ProducerJsonRecord> records = jsonRecords.getRecords();
            validateProduceRecord(records);
            AtomicReference<RecordMetadata> recordMetadata = new AtomicReference<>();
            records.forEach(producerJsonRecord -> {
                CostingRequestPayload crp = new CostingRequestPayload();
                // use Avro Deserializer to get json record from zerocode configuration
                DatumReader<CostingRequestPayload> reader
                        = new SpecificDatumReader<>(CostingRequestPayload.class);
                Decoder decoder;
                try {
                    decoder = DecoderFactory.get().jsonDecoder(
                            CostingRequestPayload.getClassSchema(), producerJsonRecord.getValue().toString());
                    crp = reader.read(null, decoder);
                } catch (IOException e) {
                    LOGGER.error("Deserialization error:" + e.getMessage());
                }

                RecordHeaders headers = new RecordHeaders();
                producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8)));
                // construct your avro record
                ProducerRecord<String, CostingRequestPayload> producerRecord = 
                        new ProducerRecord(topicName, null, null, null, crp, headers);
                try {
                    // and send it
                    RecordMetadata recordMetadata1 = producer.send(producerRecord, (metadata, e) -> {
                        if (metadata != null) {
                            // Record sent successfully. Exception == null and metadata != null
                            LOGGER.info("Message {}-{} successfully sent to topic={} part={} off={} at time={}",
                                    null,
                                    producerRecord,
                                    metadata.topic(),
                                    metadata.partition(),
                                    metadata.offset(),
                                    new Date(metadata.timestamp()));
                        } else {
                            // An error occurred. Exception != null and metadata == null
                            // Correctly handle the exception according to your needs
                            // /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-)
                            LOGGER.error("An error occurred during send !", e);
                        }
                    }).get();
                    recordMetadata.set(recordMetadata1);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                    error.set(e.getMessage());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                    error.set(e.getMessage());
                }
            });
            // dont forget to return a status for tests assertions
            return gson.toJson(new DeliveryDetails(OK, recordMetadata.get()));

        } catch (Exception e) {
            LOGGER.error("Unable to process json.", e);
            error.set(e.getMessage());
        }

        if (StringUtils.isBlank(error.get())) {
            return gson.toJson(new DeliveryDetails(FAILED, error.get()));

        }
        return gson.toJson(new DeliveryDetails(FAILED, ""));
    }

}

Because i used my generated avro pojo, no need to set a schema Because i used Avro Deserializer, the records content should be compliant - be careful to avro union. a field declated with ["null", "string"] type should be described as

"request": {
        "records": [
          {
            "key": null,
            "headers": {
              "REPLY_TOPIC": "TEST"
            },
            "value": {
              "ProductId": {
                "field": {
                  "string": "82306457"
                },
...

I hope this help !

@authorjapps i think that i could do a PR refactoring to allow the custom client to reuse some of your functions (switch fields from private to protected , etc) without the need to duplicate the code

erobertolima121 commented 3 years ago

Hi @M3lkior,

This example is in some repository so I can analyze it in more detail.

This alternative will help me a lot.

M3lkior commented 3 years ago

Hi @M3lkior,

This example is in some repository so I can analyze it in more detail.

This alternative will help me a lot.

Nop, it is a part from my enterprise application.

nishantworah commented 2 years ago

@authorjapps I am facing the same issue. Here is the log when i run the test:

2022-03-03 11:25:24,863 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - Cluster ID: lkc-oow0x
2022-03-03 11:25:25,649 [main] ERROR org.jsmart.zerocode.core.kafka.send.KafkaSender - Error in sending record. Exception : org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
2022-03-03 11:25:25,653 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

@erobertolima121 is this the same issue you are facing?

M3lkior commented 2 years ago

hi @erobertolima121 ;

If it can help, i created a gist with my helper classes in order to support AVRO in the produce step

https://gist.github.com/M3lkior/aa4f2b21a46f2d45c84b09b5b0331930