confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
13 stars 330 forks source link

Thread block when using a high flush.size #107

Closed mavdi closed 6 years ago

mavdi commented 7 years ago

I'm setting up a Kafka connect S3 sink with the following configuration

{
    "name": "kafka-s3-sink",
    "config": {
        "tasks.max": 5,
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "example-topic",
        "flush.size": 100000,
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "schema.compatibility": "NONE",
        "s3.bucket.name": "foo",
        "s3.region": "us-east-1",
        "s3.part.size": 5242880,
        "rotate.interval.ms": 86400000,
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "partition.duration.ms": 3600000,
    "locale": "en",
    "timezone": "UTC",
    "path.format": "YYYY-MM-dd-HH-mm-ss",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator"
    }

When setting flush.size to about 10000 everything works. But any number greater than that, seems to hang the connector altogether. I'm saying this because I get the following error:

[2017-10-17 11:05:46,812] WARN failed to publish monitoring message (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:499)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
    at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMonitoringMessage(MonitoringInterceptor.java:255)
    at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMetrics(MonitoringInterceptor.java:240)
    at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.publishMetrics(MonitoringInterceptor.java:206)
    at io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.run(MonitoringInterceptor.java:160)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:156)
    at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:548)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:450)

Is this a known issue? Any workarounds?

kkonstantine commented 7 years ago

Hi @mavdi

at a first glance, this seems to be an issue with monitoring interceptors and not with the S3 connector itself.

kkonstantine commented 7 years ago

And some additional information:

mavdi commented 7 years ago

I've removed the interceptors and it seems to have allowed me to increase flush.size to 40000.

Anything beyond that still fails. I get a strange error:

 WARN Commit of WorkerSinkTask{id=kafka-s3-sink-time20-0} offsets timed out
mavdi commented 7 years ago

Again, hit and miss kind of errors. I can't make any concise picture of what's going on..

[2017-10-18 09:29:18,749] WARN Aborting multi-part upload with id 'z5wjp1k4gZvSEHiGAh3OQC_bhI9CUOybf8x._xvuWtOqOOkgc8gVeX1h3AEUrlgjWLQHe9.GXAI9OKrx3jwV72xNgV9vzGXu0R83IughQLs9gdbWU19BTQatOg9QEsUazu_V1H8B37ncZtKNhyMgp77wk1TJ2dWGYUUmIDdFByo-' (io.confluent.connect.s3.storage.S3OutputStream)
kkonstantine commented 6 years ago

A higher flush.size is definitely attainable by the connector. Aborts of multi-part uploads might be more related to connectivity issues. Tuning s3.part.size could help diagnose such issue for different sizes. Closing because it's been a while since this issue was discussed, but feel free to re-open if it remains an issue.