awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

Creat new distributed connector #46

Open atulrenapurkar opened 4 years ago

atulrenapurkar commented 4 years ago

Hi

I am using kinesis-kafka-connector in distributed mode.

I was using rest post request like below http://XYZ.com:8083/connectors/

request body

{
   "name":"connector-name",
   "config":{
      "connector.class":"com.amazon.kinesis.kafka.AmazonKinesisSinkConnector",
      "tasks.max":"6",
      "topics":"topic-name",
      "maxConnections":"200",
      "rateLimit":"7000",
      "region":"us-east-1",
      "flushSync":"true",
      "singleKinesisProducerPerPartition":"false",
      "pauseConsumption":"true",
      "outstandingRecordsThreshold":"500000",
      "sleepPeriod":"1000",
      "sleepCycles":"10",
      "maxBufferedTime":"100",
      "ttl":"60000",
      "metricsLevel":"detailed",
      "metricsGranuality":"shard",
      "metricsNameSpace":"KafkaKinesisStreamsConnector",
      "usePartitionAsHashKey":"true",
      "aggregration":"false",
      "streamName":"stream-name"
   }
}

I am getting the response like below

{
    "name": "connector-name",
    "config": {
        "connector.class": "com.amazon.kinesis.kafka.AmazonKinesisSinkConnector",
        "tasks.max": "6",
        "topics": "topic-name",
        "maxConnections": "200",
        "rateLimit": "7000",
        "region": "us-east-1",
        "flushSync": "true",
        "singleKinesisProducerPerPartition": "false",
        "pauseConsumption": "true",
        "outstandingRecordsThreshold": "500000",
        "sleepPeriod": "1000",
        "sleepCycles": "10",
        "maxBufferedTime": "100",
        "ttl": "60000",
        "metricsLevel": "detailed",
        "metricsGranuality": "shard",
        "metricsNameSpace": "KafkaKinesisStreamsConnector",
        "usePartitionAsHashKey": "true",
        "aggregration": "false",
        "streamName": "stream-name",
        "name": "stream-name"
    },
    "tasks": [],
    "type": "sink"
}

But in Kafka connector log can see below error ERROR Found configuration for connector 'connector-name' in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:555)

Below are my worker-connect-distributed.properties details

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

offset.storage.file.filename=/apps/kafka/kafka-connect/connect.offsets

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=5000

# Reload metadata faster too so consumer picks up new topics
consumer.metadata.max.age.ms=10000

name=kafka-connector-poc
plugin.path=/apps/kafka/kafka-connect

group.id=connect-cluster

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=3

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
status.storage.partitions=5

rest.advertised.host.name=localhost
rest.advertised.port=8083
bdesert commented 4 years ago

I'm checking the config.

bdesert commented 4 years ago

@atulrenapurkar, could you please provide following details:

It looks like you are running into this issue: https://issues.apache.org/jira/browse/KAFKA-3988

atulrenapurkar commented 3 years ago

Sorry for the delay in reply.

I am using 0.11.0.2

org.apache.kafka connect-api 0.11.0.2