wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 192 forks source link

Resource usage limits via configuration - Tasks crashing due to high load #307

Open mkherlakian opened 2 years ago

mkherlakian commented 2 years ago

I'm trying to sink some pretty large topics from Kafka (5 topics with about 250 million events each) into BigQuery via a separate (rather large - 8CPU, 32Gb RAM X3) Kafka Connect cluster. It starts up fine but after about 2 minutes, the connect instance CPUs are pegged at 100%, and the nodes start disconnecting - ultimately the whole process restarts with little progress on getting any data into BigQuery.

I tried that configuration in a replica of our environment with many less events (500,000) and it works fine.

Are there any configurations that can throttle the processing of events to keep the CPU in check? I tried tuning queueSize and threadPoolSize, as well as max.queue.size and max.batch.size to no avail.

Any hint/help would be very much appreciated!

Here's our config for reference:

{
  "name": "hd-sink-bq",
  "tasks.max": "3",

  "queueSize": 20000,
  "threadPoolSize": 2,

  "topics": "topic1,topic2,topic3,topic4,topic5",
  "sanitizeTopics": "true",

  "autoCreateTables": "true",

  "timestampPartitionFieldName": "created_at",

  "max.queue.size": "81290",
  "max.batch.size": "20480",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "<REGISTRY_URL>",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",

  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "<REGISTRY_URL>",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "<USER:PASS>",

  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "defaultDataset": "data_lake",
  "allowNewBigQueryFields": "true",
  "bigQueryPartitionDecorator": "false",
  "project": "<PROJECT>",
  "keySource": "JSON",
  "keyfile": "<JSON_STRINGIFIED_KEY>",
  "timePartitioningType": "DAY",
  "upsertEnabled": true,
  "kafkaKeyFieldName": "_kid",

  "transforms": "removeEventRequestData,removeResponseData",

  "transforms.removeEventRequestData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.removeEventRequestData.blacklist": "headers,body,path,query",
  "transforms.removeEventRequestData.predicate": "isEventRequest",

  "transforms.removeResponseData.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.removeResponseData.blacklist": "body",
  "transforms.removeResponseData.predicate": "isAttemptResponse",

  "predicates": "isEventRequest,isAttemptResponse",
  "predicates.isEventRequest.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isEventRequest.pattern": "topic1",

  "predicates.isAttemptResponse.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isAttemptResponse.pattern": "topic2",

  "errors.deadletterqueue.topic.replication.factor": "1",
  "errors.log.include.messages": "true",
  "errors.tolerance": "all",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.name": "connect.bq-sink.deadletter"
}