apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
151 stars 99 forks source link

Problems with CamelDDBStreamSourceConnector 3.20.3 #1545

Open thuyhoang-gp opened 1 year ago

thuyhoang-gp commented 1 year ago

Hi, I am struggling to use CamelDDBStreamSourceConnector for DynamoDB change capture. Could you please advice me how to get over them?

  1. I have tried several common converters for Kafka Connect, but only one of them is 'usable'

The value of message in case of StringConverter, however, is address of InputStreamCache:

image

Could you provide me a name of converter I can use with this value.converter?

  1. The topic.key is currently blank. Could you clone it from payload of DDB Stream, for example: ID. image

Again, thanks for the work!

thuyhoang-gp commented 1 year ago

Here is my source connector configuration:

{
        "connector.class": "org.apache.camel.kafkaconnector.awsddbstreamssource.CamelAwsddbstreamssourceSourceConnector",
        "camel.kamelet.aws-ddb-streams-source.accessKey": "XXX",
        "camel.kamelet.aws-ddb-streams-source.secretKey": "XXX",
        "tasks.max": "1",
        "camel.kamelet.aws-ddb-streams-source.uriEndpointOverride": "",
        "camel.kamelet.aws-ddb-streams-source.useDefaultCredentialsProvider": "false",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "session.timeout.ms": "60000",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "camel.kamelet.aws-ddb-streams-source.region": "XXX",
        "camel.kamelet.aws-ddb-streams-source.delay": "5000",
        "topics": "ddb-test",
        "database.history.kafka.bootstrap.servers": "kafka-headless.kafka-banzai:29092",
        "camel.kamelet.aws-ddb-streams-source.streamIteratorType": "FROM_START",
        "camel.kamelet.aws-ddb-streams-source.table": "ddb_test",
        "name": "ddb-consumer",
        "camel.kamelet.aws-ddb-streams-source.overrideEndpoint": "false",
        "snapshot.mode": "initial"
    }
jakubmalek commented 1 year ago

It looks like you have same problem as in here https://github.com/apache/camel-kafka-connector/issues/1543

The source connector is returning value as InputStream java-type, which cannot be serialized with default converters.

oscerd commented 1 year ago

We're looking into the InputStream behavior. @valdar is looking at that.