mostafa / xk6-kafka

k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond
Apache License 2.0
155 stars 69 forks source link

Publishing message encoding issues with confluentinc/cp-kafka:6.2.0 #16

Closed saad1200 closed 3 years ago

saad1200 commented 3 years ago

There is a serialization issue when publishing one of the sample code from samples/test_avro.js to kafka confluent. The kafka-avro-console-consumer command crash as soon as it attempt to serialize this message. It seems there is encoding issue.

% kafka-avro-console-consumer --bootstrap-server localhost:29092 --topic mytopic.v0.avro --from-beginning --property print.key=true Processed a total of 1 messages parse error: Invalid numeric literal at line 1, column 12

We also have an elastic search sink connector configured and it also can not deserialize the kafka data. When we generate kafka messages using other tools like kafak-avro-console-producer or kafkajs things seems to work ok it's only when we're running a k6 tests that seeds data.

screen grabs from confluent control center showing the data.

Screen Shot 2021-07-30 at 22 41 22 Screen Shot 2021-07-30 at 22 41 02

.

Stack trace from kafka es connect ` [2021-07-30 21:21:59,217] ERROR WorkerSinkTask{id=local_elasticsearch-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mytopic.v0.avro to Avro:

at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)

at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)

at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

... 13 more

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

[2021-07-30 21:21:59,229] INFO [Consumer clientId=connector-consumer-local_elasticsearch-0, groupId=connect-local_elasticsearch] Revoke previously assigned partitions mytopic.v0.avro-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[2021-07-30 21:21:59,229] INFO [Consumer clientId=connector-consumer-local_elasticsearch-0, groupId=connect-local_elasticsearch] Member connector-consumer-local_elasticsearch-0-c062d254-c5d7-47b3-9c55-9b5b2962b76d sending LeaveGroup request to coordinator kafkabroker:9092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2021-07-30 21:21:59,241] INFO Publish thread interrupted for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,262] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,263] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-local_elasticsearch-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

[2021-07-30 21:21:59,278] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,279] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,280] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,283] INFO App info kafka.producer for confluent.monitoring.interceptor.connector-consumer-local_elasticsearch-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

[2021-07-30 21:21:59,283] INFO Closed monitoring interceptor for client_id=connector-consumer-local_elasticsearch-0 client_type=CONSUMER session= cluster=33W9Ct-mRbiv2QrIXn2TwA group=connect-local_elasticsearch (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

[2021-07-30 21:21:59,284] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,284] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,284] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)

[2021-07-30 21:21:59,297] INFO App info kafka.consumer for connector-consumer-local_elasticsearch-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

`

saad1200 commented 3 years ago

The issue was the serialize. The default serializer used is the json serializer. Switching to confluent kafka serializer fix the problem. Their is a sample code within samples folder to show how this can be done