lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1k stars 363 forks source link

S3 Sink connector, error during offset commit #865

Closed YordanPavlov closed 2 years ago

YordanPavlov commented 2 years ago

What version of the Stream Reactor are you reporting this issue for?

kafka-connect-aws-s3-3.0.1-2.5.0-all.tar.gz

What is the expected behaviour?

Records to written to AWS S3

What was observed?

WARN [sink-s3|task-0] WorkerSinkTask{id=sink-s3-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:390)
ERROR [sink-s3|task-0] WorkerSinkTask{id=sink-s3-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask:267)
java.lang.UnsupportedOperationException: empty.maxBy

What is your Connect cluster configuration (connect-avro-distributed.properties)?

- name: CONNECT_REST_PORT
  value: "8083"
- name: CONNECT_BOOTSTRAP_SERVERS
  value: "blockchain-kafka-kafka-0.default.svc.cluster.local:9092"
- name: CONNECT_GROUP_ID
  value: "kafka-connect"
- name: CONNECT_CONFIG_STORAGE_TOPIC
  value: "_connect-configs"
- name: CONNECT_OFFSET_STORAGE_TOPIC
  value: "_connect-offsets"
- name: CONNECT_STATUS_STORAGE_TOPIC
  value: "_connect-status"
- name: CONNECT_KEY_CONVERTER
  value: "org.apache.kafka.connect.storage.StringConverter"
- name: CONNECT_VALUE_CONVERTER
  value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE
  value: "false"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
  value: "kafka-connect"
- name: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
  value: "[%d] %p %X{connector.context}%m (%c:%L)%n"
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
  value: "1"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
  value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
  value: "1"
- name: AWS_ACCESS_KEY_ID

What is your connector properties configuration (my-connector.properties)?

 {
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"topics": "test_yordan_kafka_connect_single_partition",
"tasks.max": "1",
"aws.auth.mode": "Default",
"topics": "test_yordan_kafka_connect_single_partition",
"connect.s3.kcql": "insert into yordan-flink-savepoints-test-hz-stage:test-bucket select * from test_yordan_kafka_connect_single_partition `json` WITH_FLUSH_COUNT = 5000",
"connect.s3.aws.region": "eu-central-1",
"timezone": "UTC",
"errors.log.enable":true
}

Please provide full log files (redact and sensitive information)

kafka-connect.log

davidsloan commented 2 years ago

This is fixed in the latest version. You can wait for us to release it over the next few weeks or you could build it from source from the master branch.

YordanPavlov commented 2 years ago

This is fixed in the latest version. You can wait for us to release it over the next few weeks or you could build it from source from the master branch.

Thank you for the quick reply! I'll build from source and report back.

YordanPavlov commented 2 years ago

I have build the s3 connector from master as advised. Now on loading I see a new error:

[2022-06-29 10:40:07,601] ERROR [sink-s3|task-0] WorkerSinkTask{id=sink-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
java.lang.IllegalStateException: fatal:
Can't find fileNamingStrategy in config
Caused by: java.lang.IllegalStateException: Can't find fileNamingStrategy in config
    at io.lenses.streamreactor.connect.aws.s3.sink.FatalS3SinkError$.apply(Exceptions.scala:42)
    at io.lenses.streamreactor.connect.aws.s3.sink.S3WriterManager$.$anonfun$from$3(S3WriterManager.scala:305)
    at io.lenses.streamreactor.connect.aws.s3.sink.S3WriterManager.seekOffsetsForTopicPartition(S3WriterManager.scala:142)
    at io.lenses.streamreactor.connect.aws.s3.sink.S3WriterManager.$anonfun$open$1(S3WriterManager.scala:125)
    at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
    at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
    at scala.collection.immutable.Set$Set1.map(Set.scala:156)
    at io.lenses.streamreactor.connect.aws.s3.sink.S3WriterManager.open(S3WriterManager.scala:125)
    ... 23 more

Could you advise on what option I should be add to my config, it wasn't obvious from the examples.

davidsloan commented 2 years ago

HI @YordanPavlov

I'm assuming you are still using the configuration above?

I can see a small error in it:

"connect.s3.kcql": "insert into yordan-flink-savepoints-test-hz-stage:test-bucket select * from test_yordan_kafka_connect_single_partition `json` WITH_FLUSH_COUNT = 5000",

You are missing the STOREAS keyword from your KCQL ( STOREAS `json`).

https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/sinks/s3sinkconnector/

YordanPavlov commented 2 years ago

Thanks a lot for your reply again, I got that fixed but ran into another issue shortly after:

Caused by: org.jclouds.aws.AWSResponseException: request GET https://santiment-flink-savepoints-test-hz-stage.s3.amazonaws.com/?prefix=.indexes/sink-s3/test_yordan_kafka_connect_single_partition/00000/ HTTP/1.1 failed with code 400, error: AWSError{requestId='851A4C5527FVVREZ', requestToken='vc7W2cMKnKAyhN42S+6ttfTUrUZl6NF5NOuD1iySQTCNycRNdas0fEij2aqMwY6Pg26Yio4/0zA=', code='AuthorizationHeaderMalformed', message='The authorization header is malformed; the region 'us-east-1' is wrong; expecting 'eu-central-1'', context='{Region=eu-central-1, HostId=vc7W2cMKnKAyhN42S+6ttfTUrUZl6NF5NOuD1iySQTCNycRNdas0fEij2aqMwY6Pg26Yio4/0zA=}'}

sorry to bother, I guess the errors doesn't really point to what the problem is.

davidsloan commented 2 years ago

@YordanPavlov This is a common issue.

The jClouds library gives us problems here because it tries to work out the region but sometimes gets it wrong. Either

  1. add the configurations: connect.s3.aws.client=AWS connect.s3.aws.region=eu-west-1 (Change the region to the relevant one.

OR

  1. Define a custom URL to target a specific region like this: connect.s3.custom.endpoint=https://s3.eu-west-1.amazonaws.com/ This second is approach mentioned in the README here: https://github.com/lensesio/stream-reactor/blob/master/kafka-connect-aws-s3/README-sink.md#connects3customendpoint
YordanPavlov commented 2 years ago

Thank you for your support, the connector works for me now.