questdb / kafka-questdb-connector

QuestDB connector for Kafka.
https://github.com/questdb/kafka-questdb-connector/releases/
Apache License 2.0
31 stars 5 forks source link

Messages not being sent back to DLQ on failure causing connector on error out #26

Open RishabhAcodes opened 2 months ago

RishabhAcodes commented 2 months ago

We're registering Questdb with the Kafka connect via curl using the following:

curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-dietlytics/config -d '{
  "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
  "topics": "dietlytics",
  "client.conf.string": "http::addr=questdb:9000;",
  "name": "questdb-dietlytics",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "'"$SCHEMA_REGISTRY_URL"'",
  "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
  "include.key": false,
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "'"$SCHEMA_REGISTRY_URL"'",
  "table": "dietlytics",
  "symbols": "event_type",
  "timestamp.field.name": "timestamp",
  "timestamp.string.format": "yyyy-MM-ddTHH:mm:ss.SSSZ",
  "value.converter.schemas.enable": true,
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.deadletterqueue.topic.name": "dead-letter-example"
}'

Now, our Kafka broker has a main topic, and a DLQ topic created already.

Expected: When the producer sends an incorrectly formatted message probably incorrect timestamp in our case, Kafka connect should reject, log the error, send the message to the DLQ and continue processing as normal.

Actual When the producer sends an incorrect timestamp format, Kafka connect logs an Uncaught exception:... (QuestDB connector error here), disconnects node and shuts down altogether.

Current Fix Reset the offset manually from Kafka broker and restart the Kafka connect to restart the QuestDB connect service which then starts processing as normal.

jerrinot commented 2 months ago

thank you for reporting this!

RishabhAcodes commented 2 months ago

I feel this might be the problem as default is taken as 3 which eventually fails while creating a dead letter queue.

"errors.deadletterqueue.topic.replication.factor": 1

Haven't tested this though but this is the problem then it should be specified in the docs as well.

jerrinot commented 2 months ago

indeed, that could very well be the cause. the test we have for this does set the replication factor to 1. since it only starts a single broker.

while we cannot assume a topology of your Kafka deployments I agree the docs could warn about this. so a good suggestion, thanks!

RishabhAcodes commented 2 months ago

I tested this by sending strings into a UUID field and the Kafka Questdb connect still throws an uncaught exception until offset manually reset.

Please help us sort this out as this block the entire data processing due to this issue.

jerrinot commented 2 months ago

I will have a deeper look later this week

jerrinot commented 2 months ago

hello @RishabhAcodes: I can confirm it's not working in the latest connector release. I have a WIP with a fix. Would you be able to test an unreleased snapshot version of the connector?

RishabhAcodes commented 2 months ago

Sure, would love to @jerrinot

RishabhAcodes commented 2 months ago

Suggestions on how can I test this unreleased snapshot?

jerrinot commented 2 months ago

@RishabhAcodes: Excellent, that would be quite helpful! There are 2 options to test it. If you are a Java developer, building the connector yourself from source code might be quite simple:

  1. Clone this repository: git clone https://github.com/questdb/kafka-questdb-connector.git
  2. Go to the project directory and checkout the branch with the fit candidate: git checkout jh_dlq_slow_mode
  3. Build it (w/o running tests since they depend on Docker and are rather long): mvn clean install -DskipTests
  4. The previous step creates a file kafka-questdb-connector-0.14-SNAPSHOT-bin.zip inside connector/target - that's the connector zip.

If the steps above are too complex or you do not have Maven installed then you can just grap the zip file from: https://drive.google.com/file/d/1GvHvDxHhy0OsOSin37hYqL8JxXx-9Bga/view?usp=sharing

You install the connector from the zip as if it was a regular release. Make sure to delete the old version.

jerrinot commented 2 months ago

@RishabhAcodes I realized the snapshot version only cover some cases, but not all types issues are handled. I'll update it soon.

RishabhAcodes commented 2 months ago

Sure. Keep me in loop :)

jerrinot commented 2 months ago

@RishabhAcodes you can try this snapshot: https://drive.google.com/file/d/1Bsd57jsGYN5a5O24SG36EFQEC_euaIRR/view?usp=sharing

Caveats:

  1. DLQ requires Kafka 2.6+
  2. There is a bug in QuestDB specifically related to inserting an invalid value into a UUID field. The ILP server responds with an incorrect HTTP status, prompting the client to retry. Eventually, the client gives up and reports an error, although this takes some time. Pending fix: https://github.com/questdb/questdb/pull/4934

edit: link updated

jerrinot commented 2 months ago

there is still an issue when a message contains an unsupported field (e.g. array). has a null timestamp, etc. In general: When the connector cannot send it to QuestDB server at all. Ideally, such message should go to DQL too (when configured), but that's currently not the case. The fix is in-progress, but depends on https://github.com/questdb/questdb/pull/4936