ably / kafka-connect-ably

Kafka Connector for publishing data from Kafka to Ably
https://ably.com/solutions/extend-kafka-to-the-edge
Apache License 2.0
11 stars 6 forks source link

Support For Ably Subscription Filters in Connector #164

Closed vnmchat13 closed 1 year ago

vnmchat13 commented 1 year ago

Does the Connector support populating the extras.headers information for use in server-side filtering of messages?

To utilize Ably subscription filters as described here: https://ably.com/docs/channels

The extras.headers information needs to be populated, which doesn't appear to be currently supported by the Connector. Ideally, if Headers within Confluent where converted into the Ably extras.headers outside of the data and name payload. For example using the Confluent REST API:

{ "headers": [ { "name": "Header-1", "value": "SGVhZGVyLTE=" }, { "name": "Header-2", "value": "SGVhZGVyLTI=" } ], "key": { "type" : "STRING", "data": "acme-corp" }, "value": { "type" : "JSON", "data": { "msg" : "notification message here" } } }

sync-by-unito[bot] commented 1 year ago

➤ Automation for Jira commented:

The link to the corresponding Jira issue is https://ably.atlassian.net/browse/SDK-3923

ttypic commented 1 year ago

Hello @vnmchat13, thank you for your interest in our Kafka connector! We appreciate your curiosity. The connector supports populating the extras.headers from Kafka headers (although it might not be very well documented). The only limitation we have now is that it supports only String values.

vnmchat13 commented 1 year ago

@ttypic I cannot view the details of the Jira issue. Does this work with the cloud hosted version of Confluent, which the "hosted" REST API assumes Byte characters for the header key/value array? Is a header.converter parameter needed as part of the Ably Connector config?

ttypic commented 1 year ago

@vnmchat13

  1. Yes, it should work with the hosted version of Confluent
  2. header.converter is not needed
vnmchat13 commented 1 year ago

I receive the following error when changing the header values to string: { "error_code": 400, "message": "Illegal unquoted character ((CTRL-CHAR, code 10)): has to be escaped using backslash to be included in string value" }

POST: https://pkc-p11xm.us-east-1.aws.confluent.cloud:443/kafka/v3/clusters/lkc-o2o73y/topics/task-notification/records { "headers": [ { "name": "role", "value": "admin" }, { "name": "user", "value": "paul@velocitys3.com" } ], "key": { "type" : "STRING", "data": "account-name" }, "value": { "type" : "JSON", "data" : { "extras" : { "headers" : { "name" : "123" } }, "type": "task-reminder", "meta" : { "whatId": "SHMK-L231031-000086", "owner": "John Smith", "ownerId": "65472638734658734", "who": "Sally Mae", "whoId": "242347623846283746" }, "importance" : "medium", "interval" : 15, "intervalType" : "minutes", "msg": "Follow-up with Sally Mae for Inspection Date/Time" } } }

vnmchat13 commented 1 year ago

Can someone show an example of using String headers using Confluent Cloud posting records via the v2 records API: https://docs.confluent.io/platform/current/kafka-rest/api.html#records-v3

The examples they provide show the Headers in Binary format. If you use the REST API in a more custom format, you must host the REST Proxy externally yourself, which defeats the purpose of using a managed-only service.

ttypic commented 1 year ago

Hi @vnmchat13, this problem might be best addressed by the Confluent Cloud Rest API team. In the meantime, as a workaround, you can consider base64 decoding your strings. If you're unfamiliar with the process, you can use various online tools or programming libraries to accomplish this. Your headers section will be:

{
  "headers": [
      {
          "name": "role",
          "value": "YWRtaW4="
      },
      {
          "name": "user",
          "value": "cGF1bEB2ZWxvY2l0eXMzLmNvbQ=="
      }
  ]
}

For a more permanent solution and detailed assistance, I recommend reaching out to the Confluent support team.

vnmchat13 commented 1 year ago

The entire reason for this post, is that we where using Binary values for the headers. Works great. Ably DOES NOT convert the headers to extras.headers for use in using Ably's Subscription filtering on the passed header values. They are ignored. It was said to use Strings from your support Team, which is not supported in the fully managed REST API Proxy. You can only change this using a self-managed API Proxy to Confluent.

REST API via Confluent using Ably Connector: REST API POST: https://pkc-p11xm.us-east-1.aws.confluent.cloud:443/kafka/v3/clusters/lkc-o2o73y/topics/task-notification/records

{       
    "headers": [
        {
          "name": "role",
          "value": "YWRtaW4="
      },
      {
          "name": "user",
          "value": "cGF1bEB2ZWxvY2l0eXMzLmNvbQ=="
      }
    ],
    "key": {
        "type" : "STRING",
        "data": "SHMK"
    },
    "value": {
        "type" : "JSON",
        "data" : {
            "msg": "Follow-up with Sally Mae for Inspection Date/Time"
        }

    }
}

Ably Console Log (Headers are not being converted to extras.headers):

{
        "id": "avKEcQtIYB:0:0",
        "timestamp": 1699501693013,
        "data": "{msg=Follow-up with Sally Mae for Inspection Date/Time, intervalType=minutes, meta={owner=John Smith, whoId=242347623846283746, whatId=SHMK-L231031-000086, ownerId=65472638734658734, who=Sally Mae}, importance=medium, extras={headers={name=123}}, interval=15, type=task-reminder}",
        "name": "SHMK:task-notification"
    }

Direct Post To Ably: https://rest.ably.io/channels/task-notification/messages

{
    "name" : "SHMK:task-notification",
    "data" : {
        "msg": "Follow-up with Sally Mae for Inspection Date/Time"
    },
    "extras" : {
        "headers": {
            "role" : "admin",
            "user": "1232322323"
        }
    }
}

Ably Logs (Includes Extras Headers Information):

{
        "id": "hTD0yzYpYe:0:0",
        "timestamp": 1699502145627,
        "encoding": "json",
        "extras": {
            "headers": {
                "role": "admin",
                "user": "1232322323"
            }
        },
        "data": "{\"msg\":\"Follow-up with Sally Mae for Inspection Date/Time\"}",
        "name": "SHMK:task-notification"
    }
ttypic commented 1 year ago

Hi @vnmchat13, I am sorry, that I closed issue earlier. I probably misunderstood something, can you take a look at video I recorded. Is this somehow different from what you are wanting?

https://github.com/ably/kafka-connect-ably/assets/16488867/13c3b5da-b178-42fc-9e54-8d30a77e25df

ttypic commented 1 year ago

One more thing, looks like you don't use schema registry, so Ably doesn't know about data encoding. You need to send special header "com.ably.encoding" with "json" value (anNvbg== in base64), for REST API call it will be:

{
    "headers": [
      {
          "name": "role",
          "value": "YWRtaW4="
      },
      {
          "name": "user",
          "value": "cGF1bEB2ZWxvY2l0eXMzLmNvbQ=="
      },
      {
          "name": "com.ably.encoding",
          "value": "anNvbg=="
      }
    ]
}
vnmchat13 commented 1 year ago

This is now working as expected. Thanks @ttypic !