thomaskwscott / kafka-connect-shell-sink

A Kafka connector that invokes shell commands as a result of messages from Kafka.
Apache License 2.0
1 stars 1 forks source link

Connector doesn't respect converters. Does not support org.apache.kafka.connect.json.JsonConverter #2

Open alberttwong opened 1 year ago

alberttwong commented 1 year ago

Here's what I need to submit

curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
    -H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
    -H "columns: city,tmp_id, id = tmp_id * 100" \
    -T example2.json -XPUT \
    http://someserver:<443>/api/test_db/table2/_stream_load

or

curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
    -H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
    -H "columns: city,tmp_id, id = tmp_id * 100" \
    -d 'json_data' -XPUT \
    http://someserver:<443>/api/test_db/table2/_stream_load

Is there a way to get the entire payload? I can probably hard code everything except for example2.json. I think what I need to do is 2 shell commands. First to write the payload into a temp file and then run the curl. The other option would also need entire payload and then I can curl with only 1 shell command. Thoughts?

Here's what I submit into kafka (it's different than the above examples but you can see it's more complex).

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": true,
                "field": "ip"
            },
            {
                "type": "int16",
                "optional": false,
                "field": "userid"
            },
            {
                "type": "string",
                "optional": false,
                "field": "time"
            },
            {
                "type": "int64",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Timestamp",
                "version": 1,
                "field": "_time"
            },
            {
                "type": "string",
                "optional": true,
                "field": "request"
            },
            {
                "type": "string",
                "optional": true,
                "field": "status"
            },
            {
                "type": "int16",
                "optional": true,
                "field": "bytes"
            },
            {
                "type": "string",
                "optional": true,
                "field": "referrer"
            },
            {
                "type": "string",
                "optional": true,
                "field": "agent"
            }
        ],
        "optional": false,
        "name": "somerecord"
    },
    "payload": {
        "ip": "122.152.45.245",
        "userid": 9,
        "time": "5631",
        "_time": 5631,
        "request": "GET /site/user_status.html HTTP/1.1",
        "status": "407",
        "bytes": "278",
        "referrer": "-",
        "agent": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
    }
}
alberttwong commented 1 year ago

with

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

I get

[root@ip-172-31-9-192 kafka_2.13-3.5.0]# cat /root/out.file
Struct{ip=122.152.45.245,userid=9,time=5631,_time=Thu Jan 01 00:00:05 UTC 1970,request=GET /site/user_status.html HTTP/1.1,status=407,bytes=0,referrer=-,agent=Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)}

using

[root@ip-172-31-9-192 kafka_2.13-3.5.0]# curl -X POST -H "Content-Type: application/json" --data '{ "name": "shell-sink3", "config": {"connector.class":"uk.co.threefi.connect.shell.ShellSinkConnector", "tasks.max":"1", "shell.command":"echo \"${value}\" >> /root/out.file", "topics":"a3" }}' http://localhost:8083/connectors
alberttwong commented 1 year ago

I don't understand why $value is not a JSON.

alberttwong commented 1 year ago

I'm using this config and still the data is in String format.

curl -X POST -H "Content-Type: application/json" --data '{
    "name": "shell-sinkc1",
    "config": {
        "connector.class": "uk.co.threefi.connect.shell.ShellSinkConnector",
        "tasks.max": "1",
        "shell.command": "/opt/connectors/send.sh \"${value}\"",
        "topics": "c1",
        "value.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}' http://localhost:8083/connectors
alberttwong commented 1 year ago

so it looks like the values are just object.toString(). Did not implement converters. See https://github.com/aiven/http-connector-for-apache-kafka/blob/main/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java for an example of doing a converter.