lf-edge / ekuiper

Lightweight data stream processing engine for IoT edge
https://ekuiper.org
Apache License 2.0
1.48k stars 416 forks source link

Kafka sink - key not filled in correctly in Version 2.0 #3389

Closed wduczkowski closed 3 days ago

wduczkowski commented 4 days ago

Environment:

The key attribute in the Kafka sink configuration is not correctly populated in eKuiper version 1.14.6+ when using a pattern-based key assignment. The issue was not present in version 1.14.6, where the key was correctly derived from the events generated by the SQL query.

Steps to Reproduce:

  1. Kafka Sink Configuration: The following Kafka sink configuration is used:
    
    {
    "kafka": {
        "batchSize": 1,
        "brokers": "192.168.xxx.xx:9092",
        "bufferLength": 1024,
        "enableCache": false,
        "format": "json",
        "insecureSkipVerify": false,
        "key": "{{.key}}",
        "maxAttempts": 1,
        "omitIfEmpty": false,
        "requiredACKs": 1,
        "resourceId": "kafka",
        "runAsync": false,
        "saslAuthType": "none",
        "sendSingle": true,
        "topic": "aggregation.room"
    }
    }
2. Event Stream Data: The SQL query produces events with the following structure:

{ "humidity": 48.2, "key": "TempHumPres", "topic": "TempHumPres/get/sensor/2" }

{ "key": "TempHumPres", "temperature": 22.4, "topic": "TempHumPres/get/sensor/1" }

{ "key": "TempHumPres", "pressure": 1005.7, "topic": "TempHumPres/get/sensor/3" }


 3. Kafka Topic Observation: The Kafka topic receives messages with  key: {{.key}} instead of the expected value derived from the key attribute in the events. Expected Key: TempHumPres

![image](https://github.com/user-attachments/assets/1788777e-c5bf-4f40-a738-8cadec1a4274)

4. Previous Behavior: In version 1.14.6, the same configuration correctly populated the Kafka key based on the key attribute in the event.

![image](https://github.com/user-attachments/assets/4442fed3-e664-4401-8337-beb8d425018b)