mailgun / kafka-pixy

gRPC/REST proxy for Kafka
Apache License 2.0
773 stars 118 forks source link

ConsumeNAck drops the first message #182

Closed drjuarez closed 4 years ago

drjuarez commented 4 years ago

Hi, Thank you for your work on this project. I am trying to run a simple test, and have found that the KafkaPixyClient.ConsumeNAck function consistently drops the first message. My code is following the example go getting started in the repo, and generating a few test messages using kafkacat.

Reproduce:

  1. Start Kafka Cluster
  2. Generate 5 test Messages
  3. Run ConsumeNAck loop

Results: The first message is dropped. NOTE: no dropping happens if the grpc server is up while the message are created.

Have any of you run into this issue before? I apologize if its an error on my part, look forward to hearing back

horkhe commented 4 years ago

The first time kafka-pixy gets a message from kafka it initialises the consumer group partition offset to the head, so whatever is already in the partition won't be read by the following consume requests. Try making one consume request first, it will return the long polling timeout error, but its purpose is to initialise the consumer group offsets. Then insert messages and then run the loop.

drjuarez commented 4 years ago

Thanks for your response @horkhe. So does your statement imply that if we want to use this in a system that generates messages before the proxy is running, we will always drop the first message upon consumption?

horkhe commented 4 years ago

One more time: when kafka-pixy get a request to consume from a topic on behalf of a consumer group for the first time it initialises offsets to the head of all topic partitions. So whatever is already in the partitions won't be consumed. All messages produced to the topic after that will be consumed. However if you stop consuming for a period greater than the retention configured for the __consumer_offsets system topic, then consumer group offsets stored in kafka will be expired and removed by kafka. So the following consume request will trigger reinitialisation of topic offsets to the head of all partitions.

drjuarez commented 4 years ago

Thanks for helping me understand

drjuarez commented 4 years ago

The behaviour I am seeing on my end doesn't really line up with what you are saying though. Starting from a freshly created topic if I generate 5 messages and then start the ConsumeLoop, the consumer group partition is initialised (inconsistently) at either 0 or 1.

After initialisation and catch up to head, the consumer group offset works as expected.

If I understand you correctly, consumer group initialisation will set the offset to latest.. meaning I should not read anything in at all I suppose.

For reference, I am using Sarama and relying on the auto topic generation upon first message creation. I have confirmed all of my messages arrive in the freshly created topic using kafkacat

horkhe commented 4 years ago

kafka-pixy expects a consumed topic to be present. I am not sure how creating topics on the fly is going to affect the result. Probably the behaviour is a race between kafka creating a topic and kafka-pixy initialising offsets for the topic being created. Just create the topic explicitly beforehand to achieve predictable behavor.

timbunce commented 4 years ago

We've encountered an issue that I suspect is the same as this and we have some more details to share.

Summary of our test:

What we're seeing is that some of the messages produced don't get consumed. They seem to be 'lost'. Sometime later, some but not all of those 'lost' messages reappear. Later still the remaining messages will appear.

Here's an annotated log showing the effect: pixy-lost-msgs-issue.log

Here's the (fairly generic?) config file: krproxyconfig.yaml.txt - the kafka and zookeeper are running locally in a container in this test setup.

It seems clear that there's some unexpected buffering happening. I'm not very familiar with Kafka or Pixy and this could easily be pilot error correctable by tuning the config or pointing out where we're being silly.

Side note, in case it'd relevant: this test sends writes (produces) through the proxy. The system we're developing wouldn't do that. The pixy proxy would only be used for reads. Perhaps that would avoid the problem all together?

timbunce commented 4 years ago

This is looking more like a pixy bug related to buffering on the reading (not writing) side.

Here's a sequence of runs of the test which shows a common behavior where the first message of several doesn't get returned, and now shows the message offsets:

09:58 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:71: put "1@09:58:47" - offset 186
    krproxy_test.go:71: put "2@09:58:47" - offset 187
    krproxy_test.go:32: got "2@09:58:47" - offset 187
    krproxy_test.go:71: put "3@09:58:47" - offset 188
    krproxy_test.go:32: got "3@09:58:47" - offset 188
    krproxy_test.go:57: expected [1@09:58:47 2@09:58:47 3@09:58:47] got [2@09:58:47 3@09:58:47]

09:59 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:71: put "1@09:59:18" - offset 189
    krproxy_test.go:71: put "2@09:59:18" - offset 190
    krproxy_test.go:32: got "2@09:59:18" - offset 190
    krproxy_test.go:71: put "3@09:59:18" - offset 191
    krproxy_test.go:32: got "3@09:59:18" - offset 191
    krproxy_test.go:57: expected [1@09:59:18 2@09:59:18 3@09:59:18] got [2@09:59:18 3@09:59:18]

10:01 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:73: put "1@10:01:22" - offset 192
    krproxy_test.go:32: got "2@10:01:22" - offset 193
    krproxy_test.go:73: put "2@10:01:22" - offset 193
    krproxy_test.go:73: put "3@10:01:22" - offset 194
    krproxy_test.go:32: got "3@10:01:22" - offset 194
    krproxy_test.go:57: expected [1@10:01:22 2@10:01:22 3@10:01:22] got [2@10:01:22 3@10:01:22]

The pixy log had no new entries during those runs. At this point I waited a while. Then the pixy log shows:

2020-03-27 10:02:09.505467 Z warning </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.p0.0> "Retrying: retryNo=1, offset=180, key=dummy" kafka.group=ussjc-bx-001.ts.example.com kafka.partition=0 kafka.topic=cachectl-purge-bx
2020-03-27 10:02:26.391817 Z info </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.0> "Topic subscription expired" kafka.group=ussjc-bx-001.ts.example.com kafka.topic=cachectl-purge-bx

Then I reran the test. This time it read a message that had been written at a much earlier offset:

10:02 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:32: got "1@09:57:28" - offset 183   <== message delayed for minutes
    krproxy_test.go:42: drained 1 old messages
    krproxy_test.go:73: put "1@10:02:32" - offset 195
    krproxy_test.go:32: got "1@10:02:32" - offset 195
    krproxy_test.go:73: put "2@10:02:32" - offset 196
    krproxy_test.go:73: put "3@10:02:32" - offset 197
    krproxy_test.go:32: got "2@10:02:32" - offset 196
    krproxy_test.go:32: got "3@10:02:32" - offset 197

The pixy log had a couple of extra lines:

2020-03-27 10:02:31.505569 Z warning </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.p0.0> "Retrying: retryNo=1, offset=183, key=dummy" kafka.group=ussjc-bx-001.ts.example.com kafka.partition=0 kafka.topic=cachectl-purge-bx
2020-03-27 10:02:32.772159 Z info </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.0> "Resume request handling" kafka.group=ussjc-bx-001.ts.example.com kafka.topic=cachectl-purge-bx

Several minutes later, after writing all this up, I reran the test and all the old messages arrived:

10:23 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:32: got "1@09:57:06" - offset 180
    krproxy_test.go:32: got "1@09:58:47" - offset 186
    krproxy_test.go:32: got "1@09:59:18" - offset 189
    krproxy_test.go:32: got "1@10:01:22" - offset 192
    krproxy_test.go:42: drained 4 old messages
    krproxy_test.go:32: got "1@10:23:25" - offset 198
    krproxy_test.go:73: put "1@10:23:25" - offset 198
    krproxy_test.go:73: put "2@10:23:25" - offset 199
    krproxy_test.go:73: put "3@10:23:25" - offset 200
    krproxy_test.go:32: got "2@10:23:25" - offset 199
    krproxy_test.go:32: got "3@10:23:25" - offset 200
    krproxy_test.go:59: TestRoundtrip done
PASS

So it seems that some part of pixy is reading some messages and holding on to them for some time while other messages flow freely, before then releasing them later.

timbunce commented 4 years ago

Random observations...

horkhe commented 4 years ago

You have long_polling_timeout: 72h this is way too much. You should have it set to a few seconds. Your consumer will periodically get gRPS status 5 or HTTP 404 Not Found (depending on the client you are using) and handle it. Otherwise your application will likely timeout on its own internal request timeout, probably resetting the connection. Consumed messages can be lost during those resets.

timbunce commented 4 years ago

You have long_polling_timeout: 72h this is way too much. You should have it set to a few seconds.

Interesting, thanks. A different developer did the initial work. I'd presumed the config file was generic.

But that can't be the cause of the behaviour as my recent testing has used no config file at all. The test starts the proxy itself, passing just the needed -*Addr and -*Peers flags, then

I've noticed something I think is relevant... it seems the lost messages correlate with there being more than one "Fetched subscriptions" reported in the log:

2020-03-27 17:19:30.997636 Z info </_.0/cons.0/ussjc-bx-001.ts.example.com.0/member.0> "Fetched subscriptions: {
    kp_TimBunceA2.local_89648: [cachectl-purge-bx]
    kp_TimBunceA2.local_89679: [cachectl-purge-bx]
}" kafka.group=ussjc-bx-001.ts.example.com

So I'd guess that the 'delayed' messages were read by a previous subscription, so aren't read by the new subscription. Then, when the previous subscription times out the old messages are 'released'. Does that sound about right?

Would a short consumer.subscription_timeout setting be the right approach?

horkhe commented 4 years ago

So you have two kafka-pixy instances running in your test kp_TimBunceA2.local_89648 and kp_TimBunceA2.local_89679. And they compete for the single partition.

timbunce commented 4 years ago

That makes sense. We don't have two running but have been starting and stopping kafka-pixy on each test run. We found it doesn't terminate so it's been killed. See #183. (Note that the initial symptoms were encountered with one long-running instance of kafka-pixy so I'm doubtful this is the whole story.) I'll change the test to leave kafka-pixy running and get back to you. Thanks.

horkhe commented 4 years ago

I am 100% confident that there is no problem with kafka-pixy it has been in production on a very large scale for a long time. It is most likely the problem with your configuration, or your client. Kafka-pixy comes with testconsumer and testproducer scripts. I have just modified them to add verbose flag so they can produce results similar to your test app. Could you please use them in your tests.

EDIT: clone the repo and do go install ./..., that will add both scripts to your GOPATH/bin.

timbunce commented 4 years ago

Thanks for the updates @horkhe.

I can't reproduce the problem using the testconsumer and testproducer scripts. I also can't reproduce it with our test script with our current client code and a long-lived kafka-pixy process.

I'm happy to put our issues down to pilot error. Feel free to close the case, unless @drjuarez has any further concerns.

Thank you for your patience.