mostafa / xk6-kafka

k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond
Apache License 2.0
145 stars 61 forks source link

Load test with Avro Schema that contains references throw an error #293

Open SamyLegal opened 4 months ago

SamyLegal commented 4 months ago

Hi,

Currently, i develop a load test with xk6-kafka that use an Avro Schema with references. When i try to use a schema with references, i have an error/exception. The stacktrace of the error is present at the end of this message.

Kafka Scenario

For better understand our load test, I will explain our use case of Kafka.
We use Kafka Connect and MongoDB Source Kafka Connector to retrieve data from Mongo collections for insert them in Kafka.
We use this connector for multiple collections, so we have multiple Avro schemas that use MongoDb record schemas from this connector. For avoid to duplicate this elements of MongoDB schemas, when i create an Avro schema, I reference this Avro schemas instead of include them in my schema.

Avro schema of my principal entity

{
  "type": "record",
  "name": "PieceChangeStream",
  "namespace": "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece",
  "fields": [
    {
      "name": "ns",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Ns"
      ],
      "default": null
    },
    {
      "name": "_id",
      "type": "string"
    },
    {
      "name": "operationType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "fullDocumentBeforeChange",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "fullDocument",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "to",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.To"
      ],
      "default": null
    },
    {
      "name": "documentKey",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.DocumentKey"
      ],
      "default": null
    },
    {
      "name": "updateDescription",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.UpdateDescription"
      ],
      "default": null
    },
    {
      "name": "clusterTime",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "txnNumber",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "lsid",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Lsid"
      ],
      "default": null
    }
  ]
}

Exemple of reference schema

com.mongodb.kafka.connect.source.Ns

{
  "type": "record",
  "name": "Ns",
  "namespace": "com.mongodb.kafka.connect.source",
  "fields": [
    {
      "name": "db",
      "type": "string"
    },
    {
      "name": "coll",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

We use Apicurio Schema Registry with the "Confluent Compatibility" for that works with xk6-kafka. If i use a full schema without references my script work.

K6 script

My k6 script is as follows. I use the method "getSchema" with property "references" for retrieve my schema.

import { uuidv4 } from "https://jslib.k6.io/k6-utils/1.4.0/index.js";

import { check } from "k6";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_AVRO,
  SCHEMA_TYPE_STRING,
  RECORD_NAME_STRATEGY,
} from "k6/x/kafka"; // import kafka extension

const brokers = ["localhost:9092"];
const topic = "piece";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  autoCreateTopic: true,
});

const reader = new Reader({
  brokers: brokers,
  topic: topic,
});

const connection = new Connection({
  address: brokers[0],
});

const schemaRegistry = new SchemaRegistry({
  url: "http://localhost:8200/apis/ccompat/v6",
});

if (__VU === 0) {
  connection.createTopic({ topic: topic });
}

const valueSchemaObject = schemaRegistry.getSchema({
  enableCaching: true,
  subject: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceChangeStream",
  schemaType: SCHEMA_TYPE_AVRO,
  version: 1,
  references: [
    {
      name: "com.mongodb.kafka.connect.source.DocumentKey",
      subject: "com.mongodb.kafka.connect.source.DocumentKey",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.Lsid",
      subject: "com.mongodb.kafka.connect.source.Lsid",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.Ns",
      subject: "com.mongodb.kafka.connect.source.Ns",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.To",
      subject: "com.mongodb.kafka.connect.source.To",
      version: 1,
    },
    {
      name: "com.mongodb.kafka.connect.source.UpdateDescription",
      subject: "com.mongodb.kafka.connect.source.UpdateDescription",
      version: 1,
    },
    {
      name: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument",
      subject: "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument",
      version: 1,
    },
  ],
});

export default function () {
  for (let index = 0; index < 200; index++) {
    const key = uuidv4();
    let messages = [
      {
        key: schemaRegistry.serialize({
          data: key,
          schemaType: SCHEMA_TYPE_STRING,
        }),
        value: schemaRegistry.serialize({
          data: {
            _id: key,
            operationType: {
              string: "insert",
            },
            fullDocumentBeforeChange: null,
            fullDocument: {
              "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument":
                {
                  actif: null,
                  tenant: {
                    string: "test",
                  },
                  description: {
                    "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.DescriptionRecord":
                      {
                        lang: {
                          string: "fr",
                        },
                        value: "Piece",
                      },
                  },
                  origin: null,
                  libelle: {
                    "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.LibelleRecord":
                      {
                        lang: null,
                        value: "Piece",
                      },
                  },
                  id: {
                    string: "/referentiel-piece/api/tenants/test/piece/" + key,
                  },
                  reference: key,
                  kind: {
                    string: "piece",
                  },
                  title: null,
                  date: {
                    string: "2024-02-21T17:42:36.437Z",
                  },
                  expiration: {
                    string: "2024-02-21T17:42:36.437Z",
                  },
                  statut: {
                    string: "PUBLIE",
                  },
                  nature: {
                    string: "INSTRUCTION",
                  },
                  user: null,
                  version: null,
                  active: {
                    boolean: true,
                  },
                  _id: {
                    string: key,
                  },
                },
            },
            ns: {
              "com.mongodb.kafka.connect.source.Ns": {
                db: "pda",
                coll: {
                  string: "piece",
                },
              },
            },
            to: null,
            documentKey: {
              "com.mongodb.kafka.connect.source.DocumentKey": {
                _id: {
                  string: key,
                },
              },
            },
            updateDescription: null,
            clusterTime: null,
            txnNumber: null,
            lsid: null,
          },
          schema: valueSchemaObject,
          schemaType: SCHEMA_TYPE_AVRO,
        }),
      },
    ];
    writer.produce({ messages: messages });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 200 });
  check(messages, {
    "200 messages returned": (msgs) => msgs.length === 200
  });
}

export function teardown(data) {
  if (__VU === 0) {
    // Delete the topic
    connection.deleteTopic(topic);
  }
  writer.close();
  reader.close();
  connection.close();
}

Stacktrace of the error

ERRO[0000] panic: runtime error: invalid memory address or nil pointer dereference
goroutine 103 [running]:
runtime/debug.Stack()
        runtime/debug/stack.go:24 +0x64
go.k6.io/k6/js/common.RunWithPanicCatching.func1()
        go.k6.io/k6@v0.50.0/js/common/util.go:82 +0x180
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*Runtime).runWrapped.func1()
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/runtime.go:2442 +0xf4
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*vm).handleThrow(0x14000952900, {0x10634cd00, 0x107514980})
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:788 +0x3c0
github.com/dop251/goja.(*vm).try.func1()
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:807 +0x48
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/dop251/goja.(*vm).handleThrow(0x14000952900, {0x10634cd00, 0x107514980})
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:788 +0x3c0
github.com/dop251/goja.(*vm).runTryInner.func1()
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:830 +0x48
panic({0x10634cd00?, 0x107514980?})
        runtime/panic.go:770 +0x124
github.com/linkedin/goavro/v2.(*Codec).NativeFromTextual(0x140009cd8f0?, {0x140026ba580, 0x515, 0x580})
        github.com/linkedin/goavro/v2@v2.12.0/codec.go:500 +0x24
github.com/mostafa/xk6-kafka.(*AvroSerde).Serialize(0x106331860?, {0x1063316e0?, 0x14002061ef0?}, 0x140009cd8f0)
        github.com/mostafa/xk6-kafka@v0.25.1/avro.go:14 +0x58
github.com/mostafa/xk6-kafka.(*Kafka).serialize(0x140013ba2c0, 0x14002061e90)
        github.com/mostafa/xk6-kafka@v0.25.1/serdes.go:46 +0xb8
github.com/mostafa/xk6-kafka.(*Kafka).schemaRegistryClientClass.func4({{0x106614ee8, 0x14001fcb110}, {0x1400168fcb8, 0x1, 0x25}})
        github.com/mostafa/xk6-kafka@v0.25.1/schema_registry.go:210 +0x108
github.com/dop251/goja.(*nativeFuncObject).vmCall(0x14001c1e3c0, 0x14000952900, 0x1)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/func.go:563 +0x168
github.com/dop251/goja.call.exec(0x952900?, 0x14000952900)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:3375 +0x74
github.com/dop251/goja.(*vm).run(0x14000952900)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:582 +0x6c
github.com/dop251/goja.(*vm).runTryInner(0x14000952900?)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:834 +0x58
github.com/dop251/goja.(*baseJsFuncObject).__call(0x14002d36cc0, {0x1400178d320?, 0x1, 0x104ba84bc?}, {0x0, 0x0}, {0x1066151e0?, 0x1075bc940?})
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/func.go:426 +0x5d0
github.com/dop251/goja.(*baseJsFuncObject)._call(...)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/func.go:442
github.com/dop251/goja.(*baseJsFuncObject).call(0x0?, {{0x1066151e0, 0x1075bc940}, {0x1400178d320, 0x1, 0x1}}, {0x0?, 0x0?})
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/func.go:450 +0x74
github.com/dop251/goja.(*baseJsFuncObject).Call(...)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/func.go:382
github.com/dop251/goja.AssertFunction.func1.1()
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/runtime.go:2402 +0x68
github.com/dop251/goja.(*vm).try(0x14000952900, 0x14002cac728)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/vm.go:811 +0x1e0
github.com/dop251/goja.(*Runtime).runWrapped(0x14001a07c08, 0x0?)
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/runtime.go:2446 +0x64
github.com/dop251/goja.AssertFunction.func1({0x1066151e0?, 0x1075bc940?}, {0x1400178d320?, 0x1?, 0x1?})
        github.com/dop251/goja@v0.0.0-20240220182346-e401ed450204/runtime.go:2401 +0x78
go.k6.io/k6/js.(*VU).runFn.func2.1()
        go.k6.io/k6@v0.50.0/js/runner.go:866 +0x4c
go.k6.io/k6/js/eventloop.(*EventLoop).Start(0x14002d27ef0, 0x14001fec780)
        go.k6.io/k6@v0.50.0/js/eventloop/eventloop.go:177 +0x160
go.k6.io/k6/js.(*VU).runFn.func2()
        go.k6.io/k6@v0.50.0/js/runner.go:865 +0xcc
go.k6.io/k6/js/common.RunWithPanicCatching({0x106619a18?, 0x14000341d80?}, 0x107513070?, 0x14000a0f1e0?)
        go.k6.io/k6@v0.50.0/js/common/util.go:86 +0x74
go.k6.io/k6/js.(*VU).runFn(0x14001b6a1e0, {0x106602ee8, 0x14000a0f0e0}, 0x1, 0x140001b64f8, 0x1400178d000, {0x1400178d320, 0x1, 0x1})
        go.k6.io/k6@v0.50.0/js/runner.go:864 +0x1d8
go.k6.io/k6/js.(*ActiveVU).RunOnce(0x14000a52540)
        go.k6.io/k6@v0.50.0/js/runner.go:797 +0x3d4
go.k6.io/k6/lib/executor.PerVUIterations.Run.getIterationRunner.func7({0x106602eb0, 0x14001fe3bf0}, {0x1065efd00?, 0x14000a52540?})
        go.k6.io/k6@v0.50.0/lib/executor/helpers.go:108 +0x44
go.k6.io/k6/lib/executor.PerVUIterations.Run.func5({0x1065fae18, 0x14001b6a1e0})
        go.k6.io/k6@v0.50.0/lib/executor/per_vu_iterations.go:228 +0x320
created by go.k6.io/k6/lib/executor.PerVUIterations.Run in goroutine 101
        go.k6.io/k6@v0.50.0/lib/executor/per_vu_iterations.go:241 +0x8cc

Have you got an idea of the origin of the error ? Is that scenario work with xk6-kafka ?

mostafa commented 4 months ago

Hey @SamyLegal,

Please see FAQ No. 12 on README.

SamyLegal commented 4 months ago

Thank you for the answer @mostafa

I have read the FAQ No. 12 on README and i have try my schema without references with the tool https://github.com/mostafa/nested-avro-schema and all works fine.

I think my error is not related to nested schemas Avro but an Avro schema that contains references to others schemas in a schema registry like this :

{
  "type": "record",
  "name": "PieceChangeStream",
  "namespace": "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece",
  "fields": [
    {
      "name": "ns",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.Ns"
      ],
      "default": null
    },
    {
      "name": "_id",
      "type": "string"
    },
    {
      "name": "operationType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "fullDocumentBeforeChange",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "fullDocument",
      "type": [
        "null",
        "fr.mgdis.contract.kafka_connect.mongo.referentiel_piece.PieceDocument"
      ],
      "default": null
    },
    {
      "name": "to",
      "type": [
        "null",
        "com.mongodb.kafka.connect.source.To"
      ],
      "default": null
    },
   ...
  ]
}

In the project, i have not found a unit test with this case. For you do you think is it works ?

The library "srclient" that you use in this project handle this case. https://github.com/riferrei/srclient/blob/baa74d8799c9533e679e30739a09346fbd8f6982/schemaRegistryClient_test.go#L291