confluentinc / confluent-kafka-go-dev

[EXPERIMENTAL] Development / WIP / exploratory / test fork of confluent-kafka-go
Apache License 2.0
0 stars 7 forks source link

producer stalls after `queue.buffering.max.messages` messages #2

Open lizthegrey opened 5 years ago

lizthegrey commented 5 years ago

Description

We're seeing problems with hundreds of thousands of events queuing up on our producers that aren't being written to kafka fast enough, causing OOMing on our producers. Yes, I know this is vague, happy to try to triangulate/debug this with you.

How to reproduce

Stream hundreds of thousands of protobuf marshalled events into ProduceChannel(), run /debug/pprof to look at the heap, or wait for the oom.

Checklist

Please provide the following information:

lizthegrey commented 5 years ago

Going to try go.batch.producer set to true, and also going to tune down the go.{events,produce}.channel.size so that we would rather refuse events than stall the box and OOM out.

Also going to report Len() through our telemetry periodically to keep an eye on how fast things are draining.

lizthegrey commented 5 years ago

Tuning those settings, unfortunately, did not help.

lizthegrey commented 5 years ago

Going to need to do a CPU profile in order to figure out why it's not performant enough, but I changed our code to noisily fail if the producer queue starts blocking, rather than just silently buffer up more messages and OOM.

lizthegrey commented 5 years ago

Curiouser and curiouser -- it would seem that whatever is stalling is in the C++ library, as a pprof CPU profile turns up no hotspots within the go code or the cgo shims.

lizthegrey commented 5 years ago

@ianwilkes found that the batching code was stalling after writing approximately 300 payloads into it. Tagging him in here.

lizthegrey commented 5 years ago

Here are the relevant threads:

goroutine 43 [runnable]:
github.com/confluentinc/confluent-kafka-go-dev/kafka._Cfunc_do_produce(0x7f8c3c0211e0, 0x7f8c30000dd0, 0x600000002, 0x0, 0xc08d291680, 0x23e, 0xc000000001, 0xc0c6e6a391, 0x0, 0x0, ...)
        _cgo_gotypes.go:573 +0x50
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*Producer).produce.func2(0xc00023c080, 0x7f8c30000dd0, 0xc0994dde00, 0x4, 0xc000000000, 0xc000564ea8, 0x23e, 0x1, 0xc000564ec0, 0x0, ...)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:262 +0x30c
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*Producer).produce(0xc00023c080, 0xc0994dde00, 0x4, 0x0, 0x0, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:262 +0x4a9
github.com/confluentinc/confluent-kafka-go-dev/kafka.channelProducer(0xc00023c080)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:474 +0x95
created by github.com/confluentinc/confluent-kafka-go-dev/kafka.NewProducer
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:464 +0x5c0

goroutine 42 [syscall]:
github.com/confluentinc/confluent-kafka-go-dev/kafka._Cfunc__rk_queue_poll(0x7f8c3c000da0, 0x64, 0xc0c7c12180, 0xc0c7be5170, 0x0, 0x0)
        _cgo_gotypes.go:556 +0x4e
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*handle).eventPoll.func1(0xc00023c090, 0xc000284f90, 0xc0c7c12180, 0xc0c7be5170, 0xc000284d68, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/event.go:168 +0x141
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*handle).eventPoll(0xc00023c090, 0xc00026a180, 0x64, 0x3e8, 0xc000086180, 0x0, 0x0, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/event.go:168 +0x122
github.com/confluentinc/confluent-kafka-go-dev/kafka.poller(0xc00023c080, 0xc000086180)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:548 +0x75
created by github.com/confluentinc/confluent-kafka-go-dev/kafka.NewProducer
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:458 +0x53c

goroutine 45 [chan receive, 3 minutes]:
github.com/honeycombio/hound/kafka.(*DefaultProducer).init.func1(0xc0000af020, 0xc00026a240)
        /home/circleci/project/kafka/producer.go:121 +0x8f
created by github.com/honeycombio/hound/kafka.(*DefaultProducer).init
        /home/circleci/project/kafka/producer.go:119 +0x88

goroutine 46 [chan receive]:
github.com/honeycombio/hound/kafka.(*DefaultProducer).init.func2(0xc0000af020, 0xc00026a240, 0x169ab40, 0xc00023c080)
        /home/circleci/project/kafka/producer.go:166 +0xd5
created by github.com/honeycombio/hound/kafka.(*DefaultProducer).init
        /home/circleci/project/kafka/producer.go:163 +0xc8

goroutine 67 [runnable]:
github.com/confluentinc/confluent-kafka-go-dev/kafka._Cfunc__rk_queue_poll(0x7f8c3c023f00, 0x64, 0xc0c6e6a0fc, 0xc0c6e48f90, 0x0, 0x0)
        _cgo_gotypes.go:556 +0x4e
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*handle).eventPoll.func1(0xc00023c110, 0xc000282f90, 0xc0c6e6a0fc, 0xc0c6e48f90, 0xc000282d68, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/event.go:168 +0x141
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*handle).eventPoll(0xc00023c110, 0xc00026a420, 0x64, 0x3e8, 0xc0000861e0, 0x0, 0x0, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/event.go:168 +0x122
github.com/confluentinc/confluent-kafka-go-dev/kafka.poller(0xc00023c100, 0xc0000861e0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:548 +0x75
created by github.com/confluentinc/confluent-kafka-go-dev/kafka.NewProducer
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:458 +0x53c

goroutine 68 [syscall]:
github.com/confluentinc/confluent-kafka-go-dev/kafka._Cfunc_do_produce(0x7f8c3c0244e0, 0x3ee5e70, 0x600000001, 0x0, 0xc0047bb800, 0x3e6, 0xc000000001, 0xc0c6e6a102, 0x0, 0x0, ...)
        _cgo_gotypes.go:573 +0x50
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*Producer).produce.func2(0xc00023c100, 0x3ee5e70, 0xc00809bf40, 0x4, 0xc000000000, 0xc000565ea8, 0x3e6, 0x1, 0xc000565ec0, 0x0, ...)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:262 +0x30c
github.com/confluentinc/confluent-kafka-go-dev/kafka.(*Producer).produce(0xc00023c100, 0xc00809bf40, 0x4, 0x0, 0x0, 0x0)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:262 +0x4a9
github.com/confluentinc/confluent-kafka-go-dev/kafka.channelProducer(0xc00023c100)
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:474 +0x95
created by github.com/confluentinc/confluent-kafka-go-dev/kafka.NewProducer
        /home/circleci/project/vendor/github.com/confluentinc/confluent-kafka-go-dev/kafka/producer.go:464 +0x5c0

goroutine 70 [chan receive, 3 minutes]:
github.com/honeycombio/hound/kafka.(*DefaultProducer).init.func1(0xc0000af080, 0xc00007c180)
        /home/circleci/project/kafka/producer.go:121 +0x8f
created by github.com/honeycombio/hound/kafka.(*DefaultProducer).init
        /home/circleci/project/kafka/producer.go:119 +0x88

goroutine 71 [chan receive]:
github.com/honeycombio/hound/kafka.(*DefaultProducer).init.func2(0xc0000af080, 0xc00007c180, 0x169ab40, 0xc00023c100)
        /home/circleci/project/kafka/producer.go:166 +0xd5
created by github.com/honeycombio/hound/kafka.(*DefaultProducer).init
        /home/circleci/project/kafka/producer.go:163 +0xc8
lizthegrey commented 5 years ago

(I think this is now in a state where we know enough about what's going on that I'll tag in @edenhill)

lizthegrey commented 5 years ago

Confirmed, my initial benchmarking issues were pebkac, if metadata.broker.list isn't set (like it wasn't in our benchmark harness...)

lizthegrey commented 5 years ago

Okay, yup, confirmed, if the queue.buffering.max.messages size is set to 100 on the producer, 100 messages will be received at the consumer end, then the producer stalls out and doesn't send any further messages!

If I change queue.buffering.max.messages to 10, then I only get 10 messages, etc.

it doesn't matter whether go.batch.producer is set or not; likewise whether go.delivery.reports is set or not.

GDB:

Thread 20 (Thread 0x7fffcbfff700 (LWP 11517)):
#0  0x00007ffff7d38819 in __GI___poll (fds=0x7fffc4002328, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
#1  0x0000000000b1f6fe in rd_kafka_transport_io_serve ()
#2  0x0000000000ae16e8 in ?? ()
#3  0x0000000000adeb99 in ?? ()
#4  0x0000000000adaec3 in ?? ()
#5  0x0000000000a9e446 in ?? ()
#6  0x00007ffff7e1cfa3 in start_thread (arg=<optimized out>) at pthread_create.c:486
#7  0x00007ffff7d434cf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 19 (Thread 0x7fffecfb9700 (LWP 11516)):
#0  0x00007ffff7d38819 in __GI___poll (fds=0x7fffd0000f18, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
#1  0x0000000000b1f6fe in rd_kafka_transport_io_serve ()
#2  0x0000000000ae16e8 in ?? ()
#3  0x0000000000adeb99 in ?? ()
#4  0x0000000000adaec3 in ?? ()
#5  0x0000000000a9e446 in ?? ()
#6  0x00007ffff7e1cfa3 in start_thread (arg=<optimized out>) at pthread_create.c:486
#7  0x00007ffff7d434cf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 3 (Thread 0x7ffff51e1700 (LWP 11500)):
#0  futex_wait_cancelable (private=0, expected=0, futex_word=0x7fffdc002848) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x7fffdc0027f8, cond=0x7fffdc002820) at pthread_cond_wait.c:502
#2  __pthread_cond_wait (cond=0x7fffdc002820, mutex=0x7fffdc0027f8) at pthread_cond_wait.c:655
#3  0x0000000000a9e376 in cnd_wait ()
#4  0x0000000000ad2ab6 in ?? ()
#5  0x0000000000ad382a in rd_kafka_produce_batch ()
#6  0x00000000009fa875 in _cgo_b361ee20cc93_Cfunc_rd_kafka_produce_batch ()
#7  0x00000000004b0b90 in ?? ()
#8  0x00000000000001fe in ?? ()
#9  0x0000000000000200 in ?? ()
#10 0x00007ffff51df028 in ?? ()
#11 0x00000000004ad87b in ?? ()
#12 0x000000c000036a00 in ?? ()
#13 0x0000000000000438 in ?? ()
#14 0x000000c00009b980 in ?? ()
#15 0x00000000004871c0 in ?? ()
#16 0x000000c000000d80 in ?? ()
#17 0x0000000000800000 in ?? ()
#18 0x0000000000000000 in ?? ()
lizthegrey commented 5 years ago

My further analysis says that we're blocking inside the inlined function rd_kafka_curr_msgs_add -- because the total message count isn't being decremented after messages are written successfully... or because we're requesting to write more messages in a single go than queue.buffering.max.messages (rk->rk_curr_msgs.cnt + cnt > rk->rk_curr_msgs.max_cnt is likely always going to be true, so we'll always be asking for the status of the rk_curr_msgs.cnd and waiting to be woken up, except the messages have already been sent so there's nothing to wake us up by decrementing in rd_kafka_curr_msgs_sub and calling the wakeup on rk_curr_msgs.cnd)

rd_kafka_curr_msgs_add()
 * @brief Add \p cnt messages and of total size \p size bytes to the
 *        internal bookkeeping of current message counts.
 *        If the total message count or size after add would exceed the
 *        configured limits \c queue.buffering.max.messages and
 *        \c queue.buffering.max.kbytes then depending on the value of
 *        \p block the function either blocks until enough space is available
 *        if \p block is 1, else immediately returns
edenhill commented 5 years ago

Hey @lizthegrey, do you have some example code that showcases this problem?

lizthegrey commented 4 years ago

Unfortunately a minimal testcase here would be too challenging to produce :/ we wound up getting the feature we needed in Sarama so we abandoned efforts to switch to rdkafka :/

DoubtMail commented 1 year ago

Description

We're seeing problems with hundreds of thousands of events queuing up on our producers that aren't being written to kafka fast enough, causing OOMing on our producers. Yes, I know this is vague, happy to try to triangulate/debug this with you.

How to reproduce

Stream hundreds of thousands of protobuf marshalled events into ProduceChannel(), run /debug/pprof to look at the heap, or wait for the oom.

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 4315eca
  • [x] Apache Kafka broker version: 2.3 (Confluent 5.3)
  • [x] Client configuration: ConfigMap{...}
  "metadata.broker.list": "kafka-bootstrap-nlb.{...}.honeycomb.io"
  "go.delivery.reports": true
  "group.id": hostname,
  "compression.codec": "snappy",
  "queue.buffering.max.messages": 10,
  "queue.buffering.max.ms": 100,
  "retry.backoff.ms":         250,
  "message.send.max.retries": 3,
  "enable.auto.commit":       false,
  "enable.auto.offset.store": false,
  • [x] Operating system: Ubuntu 18.04 LTS
  • [x] Provide client logs (with "debug": ".." as necessary): see gdb backtrace
  • [x] Provide broker log excerpts: none relevant
  • [x] Critical issue: library fails to work.

can you explain if this could be resold