strimzi / strimzi-kafka-operator

Apache Kafka® running on Kubernetes
https://strimzi.io/
Apache License 2.0
4.85k stars 1.3k forks source link

[Question] Kafkaconnector runs very slow and encounters some errors #3231

Closed saranyaeu2987 closed 4 years ago

saranyaeu2987 commented 4 years ago

I have a kafka topic which currently has 97k messages across 10 partitions. The connector runs very slow - processing only 150 to 200 messages per min.

K8s cluster details: 3 node m5large eks cluster

KafkaConnector.yaml currently running on only one m5large instance

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-emd-locc-connector
  labels:
    strimzi.io/cluster: emd-kafka-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSinkConnector
  tasksMax: 10
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: DB.EMD.LOCATION.0
    camel.sink.url: aws2-s3://test123?keyName=emd_loc_temp/${date:now:yyyy-MM-dd}/${date:now:yyyyMMdd-HHmmssSSS}
    camel.component.aws2-s3.configuration.autocloseBody: false
    camel.component.aws2-s3.useIAMCredentials: true

It takes very long time to consume messages and occasionally encounters following errors, but keeps proceeding.

  1. Info message1 (occurs frequently)

2020-06-23 14:12:44,024 INFO [Consumer clientId=connector-consumer-s3-emd-locc-connector-0, groupId=connect-s3-emd-locc-connector] Attempt to heartbeat failed since group is rebalancing

  1. Info message2(occurs frequently)

    INFO [Consumer clientId=connector-consumer-heb-s3-emd-locc-connector-5, groupId=connect-heb-s3-emd-locc-connector] Member connector-consumer-heb-s3-emd-locc-connector-5-800c1ffa-838d-4d6c-8435-54217b19f832 sending LeaveGroup request to coordinator w2capl0084101.heb.com:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

A. Error 1 (occurred multiple times but proceeded)

(org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-heb-s3-emd-locc-connector-1] org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511)

B. Error 2 (occurred 2 times but proceeded)

ERROR WorkerSinkTask{id=s3-emd-locc-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-heb-s3-emd-locc-connector-0] org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

Questions:

  1. Can tasksMax be only 10 (partition#) or more than that ?
  2. What causes about errors? Can it be avoided by increasing max.poll.interval.ms values from 300000 to a bigger value ? If so how to update that value (max.poll.interval.ms )? I see below default settings in my kafkaconnect deployment max.poll.interval.ms = 300000 max.poll.records = 500
  3. How to speed up kafkaconnector processing ?
scholzj commented 4 years ago

This seems more like Kafka Connect / Camel Connector question than Strimzi question.

1) In general, you can set there even higher numbers. But it depends on the connector whether it can do more than one task and in most cases more tasks than partitions makes little sense.

2) Judging from the errors, you maybe want to make max.poll.interval.ms interval smaller and not bigger. You should be able to set it in KafkaConnect.spec.config or in KafkaConnector.spec.config.

3) I guess you need to first find out which part is slow - Kafka brokers? Network? Connect? Connector?

saranyaeu2987 commented 4 years ago

@scholzj

  1. values are not picked up from KafkaConnect.spec.config. It has only default value. Here is my kafkaconnect
    
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
    name: emd-kafka-cluster
    annotations:
    strimzi.io/use-connector-resources: "true"
    spec:
    image: 202991147671.dkr.ecr.us-east-1.amazonaws.com/strimzicamel-kafka-connector:1.0.1
    logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
    replicas: 1
    bootstrapServers: <broker>
    authentication:
    type: plain
    username: emdconsumer
    passwordSecret:
      secretName: sasl-user-pass
      password: password
    tls:
    trustedCertificates:
      - secretName: hebcert
        certificate: HEBcert.crt
    config:
    group.id: emdconsumer-group-1
    offset.storage.topic: emd-connect-offsets
    config.storage.topic: emd-connect-configs
    status.storage.topic: emd-connect-status
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    max.poll.records: 100.   <--- see this value??


Attached the log which shows default value **max.poll.records =500** is set instead of 100.
[container-location1.log](https://github.com/strimzi/strimzi-kafka-operator/files/4825803/container-location1.log)

2. Also, I see more messages at consumer end than produced.
How does  Strimzi kafkaconnect ensure "**exactly once processing**" ? 

3.
> I guess you need to first find out which part is slow - Kafka brokers? Network? Connect? Connector?

kafka brokers are not slow, because standalone consumer consumes 97k messages in 22 seconds.
How to check if connect or connector is slow - Suggestions please.
scholzj commented 4 years ago

Strimzi is an orchestration layer only. Inside is a regular Apache Kafka release - with example the same binaries as you can download from kafka.apache.org. So there is no real Strimzi Kafka Connect - we just deploy Kafka Connect for you. The configs from KafkaConnect.spec.config are just passed into the Kafka Connect configuration file. Maybe you need to prefix the max.poll.records with consumer.? I know it was needed for some options. You also have there 100. - which I wonder if it is in the actual YAML or just a copy paste error.

saranyaeu2987 commented 4 years ago

@scholzj 100. is just copypaste error.

I see a lot of Attempt to heartbeat failed since group is rebalancing I feel thats causing the delay (maybe?)

max.poll.records is a kafka property https://kafka.apache.org/documentation/#max.poll.records -> It can be used as is. I checked /tmp/strimzi-connect.properties in pod and value is set max.poll.records=100 unsure where the log is pulling value max.poll.records=500

scholzj commented 4 years ago

So, have you tried to prefix it with consumer. s I suggested?

saranyaeu2987 commented 4 years ago

@scholzj

  1. setting consumer.max.poll.records: 100 worked but it didnt speedup the process :(

  2. Also, I see so many messages duplicated, how to guarantee exactly once processing ?

  3. Can one kafkaconnect have multiple kafkaconnectors (1 for each topic) ?

scholzj commented 4 years ago

Re 1) Well, setting that to lower level should IMHO in general improve the latency but worsen the throughput. So I don't think that would be expected to help.

Re 2) I have no idea how that works in Connect. But among other things it would depend how you store the data into S3, whether S3 supports something like that or not etc. If you just store the message to some bucket with file name based on some header etc., it should be just idempotent.

Re 3) I do not think there is any limit on number of connectors Strimzi would impose.

saranyaeu2987 commented 4 years ago

2) I thought below topics in kafkaconnect is mainly used for maintaining the actual topic(s) status and acts as idempotent/process once. Is it not true? config.storage.topic status.storage.topic

scholzj commented 4 years ago

As I said, I have no idea how Kafka Connect implements exactly once. I do not expect these topics to have anything to do with it, but I don't know. The consumer in Connect is basically the regular Consumer API - so that will do whatever the regular consumer does in this respect. And as I said, the other side will depend on the connector and where it connects.

scholzj commented 4 years ago

Is there still something we can help with here? Or can we close the issue? Thanks

saranyaeu2987 commented 4 years ago

Can we close this

On Wed, Aug 26, 2020, 3:29 PM Jakub Scholz notifications@github.com wrote:

Is there still something we can help with here? Or can we close the issue? Thanks

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/strimzi/strimzi-kafka-operator/issues/3231#issuecomment-681133912, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEXOLXXQ2WDW26HQPKJLSOLSCV5DHANCNFSM4OF7G6ZQ .