kestra-io / plugin-kafka

https://kestra.io/plugins/plugin-kafka/
Apache License 2.0
4 stars 5 forks source link

Unable to produce an AVRO Record message #37

Closed loicmathieu closed 1 year ago

loicmathieu commented 1 year ago

Expected Behavior

Task fail with the following error:

class java.util.HashMap cannot be cast to class org.apache.avro.generic.IndexedRecord (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader io.kestra.core.plugins.PluginClassLoader @338c99c8) in field identity
class java.util.HashMap cannot be cast to class org.apache.avro.generic.IndexedRecord (java.util.HashMap is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader io.kestra.core.plugins.PluginClassLoader @338c99c8)

Error serializing Avro message

Possible cause:

the value in the flow is converted into HashMap even when the contains a field of type record this one is also converted into HashMap, but the schema waits for record type. This causes the casting issue from HashMap to IndexedRecord.

Support ticket: https://support.kestra.io/a/tickets/38

Actual Behaviour

No response

Steps To Reproduce

No response

Environment Information

Example flow

id: produce
namespace: io.kestra.tests
inputs:
  - type: FILE
    name: file

tasks:
  - id: csvReader
    type: io.kestra.plugin.serdes.csv.CsvReader
    from: "{{ inputs.file }}"
  - id: fileTransform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csvReader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet,
          "stat": {
              "followers_count": row.followers_count
          }
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result
  - id: produce
    type: io.kestra.plugin.kafka.Produce
    from: "{{ outputs.fileTransform.uri }}"
    keySerializer: STRING
    properties:
      bootstrap.servers: local:9092
    serdeProperties:
      schema.registry.url: http://local:8085
    topic: test_kestra
    valueAvroSchema: |
      {
        "type": "record",
        "name": "twitter_schema",
        "namespace": "io.kestra.examples",
        "fields": [
          {
            "name": "username",
            "type": "string"
          },
          {
            "name": "tweet",
            "type": "string"
          },
          {
            "name": "stat",
            "type": {
              "type": "record",
              "name": "stat",
              "fields": [
                {
                  "name": "followers_count",
                  "type": "long"
                }
              ]
            }
          }
        ]
      }
loicmathieu commented 1 year ago

https://github.com/kestra-io/plugin-kafka/pull/38 will fix the issue but note that the message must have the good type which is not the case here as the file transform will generate a string for the folowers_count.

This will do the trick:

  - id: fileTransform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.csvReader.uri }}"
    script: |
      var result = {
        "key": row.id,
        "value": {
          "username": row.username,
          "tweet": row.tweet,
          "stat": {
              "followers_count": row.followers_count
          }
        },
        "timestamp": row.timestamp,
        "headers": {
          "key": "value"
        }
      };
      row = result