knative-extensions / eventing-kafka

Kafka integrations with Knative Eventing.
Apache License 2.0
77 stars 82 forks source link

After kafka-ch-dispatcher is restarted, undelivered events in Kafkatopic are not delivered. #420

Closed ch0312 closed 3 years ago

ch0312 commented 3 years ago

Describe the bug After kafka-ch-dispatcher is restarted, undelivered events in Kafkatopic are not delivered.

Expected behavior After kafka-ch-dispatcherh is restarted, it will deliver the undelivered events in Kafkatopic.

To Reproduce

ch0312 commented 3 years ago

I would like to add the means of reproduction that I use. isssue#420_yaml.zip

kubectl -n knativetutorial apply -f channel_svc.yaml

kubectl -n knativetutorial apply -f event-display-sub-retry_svc.yaml

kubectl -n knativetutorial apply -f event-dead_svc.yaml

kubectl -n knativetutorial apply -f curler.yaml

- Send an event to the channel. Since the event-dispaly service does not exist, redelivery should be attempted for a few tens of seconds, and the event should eventually be delivered to the event-dead service.
```typescript
# kubectl -n knativetutorial exec -it curler -- /bin/bash

[root@curler /]# curl -v "http://event-display-ch-kn-channel.knativetutorial.svc.cluster.local" -X POST -H "Ce-Id: say-hello" -H "Ce-Specversion: 1.0" -H "Ce-Type: greeting" -H "Ce-Source: mycurl" -H "Content-Type: application/json" -d '{"key":"test"}'

docker kill f81f00d0bd63

f81f00d0bd63

docker ps | grep k8s_dispatcher_kafka-ch-dispat cher

fdcd16fc1bf0 d10eec61701c "/ko-app/dispatcher" 1 second ago Up Less than a second k8s_dispatcher_kafka-ch -dispatcher-fc5c9b94c-jgm66_knative-eventing_639ac795-82dc-41e9-97f3- d82edfeef98f_33

- Check the log of the event-dead Pod, but no event is delivered even if you wait for a long time.
```typescript
# stern -n knativetutorial event-dead
ch0312 commented 3 years ago

I will describe the tentative solution I found, as it may help to find the cause of the problem.

  1. I created an event-display-ch, sent an event to channel, and killed the kafka-ch-dispatcher Pod before the event was delivered. As mentioned earlier, this event is not delivered. The result is the same no matter how many times it is repeated.
  2. Then I sent the event to the channel again and did nothing to the kafka-ch-dispatcher Pod. And of course the event was delivered.
  3. I then send an event to the channel and killed kafka-ch-dispatcher Pod before the event was delivered. The event was delivered by the restored kafka-ch-dispatcher Pod.

This means that once the kafka-ch-dispatcher Pod has successfully delivery an event at least once, the problem no longer occurs. This behavior can be reproduced by deleting and recreating the channel.

slinkydeveloper commented 3 years ago

At the first try, did the dispatcher replied with 202 to your event-display-ch?

This means that once the kafka-ch-dispatcher Pod has successfully delivery an event at least once

So, this happens only with the first message ever in your channel (aka in your topic), right?

ch0312 commented 3 years ago

That's right. Regardless of whether it is the first time or not, when you send a message to a channel, the channel will return 202. The actual message is as follows:

# curl -v "http://event-display-ch-kn-channel.knativetutorial.svc.cluster.local" -X POST -H "Ce-Id: say-hello" -H "Ce-Specversion: 1.0" -H
"Ce-Type: greeting" -H "Ce-Source: mycurl" -H "Content-Type: application/json
" -d '{"key":"test1"}'
Note: Unnecessary use of -X or --request, POST is already inferred.
* Rebuilt URL to: http://event-display-ch-kn-channel.knativetutorial.svc.cluster.local/
*   Trying 10.97.46.107...
* TCP_NODELAY set
* Connected to event-display-ch-kn-channel.knativetutorial.svc.cluster.local (10.97.46.107) port 80 (#0)
> POST / HTTP/1.1
> Host: event-display-ch-kn-channel.knativetutorial.svc.cluster.local
> User-Agent: curl/7.61.1
> Accept: */*
> Ce-Id: say-hello
> Ce-Specversion: 1.0
> Ce-Type: greeting
> Ce-Source: mycurl
> Content-Type: application/json
> Content-Length: 15
>
* upload completely sent off: 15 out of 15 bytes
< HTTP/1.1 202 Accepted
< Date: Tue, 02 Mar 2021 03:09:11 GMT
< Content-Length: 0
<
* Connection #0 to host event-display-ch-kn-channel.knativetutorial.svc.cluster.local left intact

So, this happens only with the first message ever in your channel (aka in your topic), right?

It's not just the first message. I'm sorry if my explanation is confusing.

  1. Send an event to the channel and kill the kafka-ch-dispatcher Pod before the event is delivered. Even if the kafka-ch-dispatcher Pod recovers, this event will not be delivered.
  2. Send the event to the channel and do nothing to the kafka-ch-dispatcher Pod. The event will be delivered normally.

The problem in number 1 will be reproduced as many times as possible until the work in number 2 is performed. In other words, this problem will occur not only with the first message, but with all messages until successful delivery is made.

slinkydeveloper commented 3 years ago

Ok, my point is, when you kill the kafka dispatcher pod (like in point 1), does the channel replies with 202? Or it doesn't reply?

Because the delivery guarantee of kafka channel is: If channel replies with 202, then it should deliver the message, even after a crash

slinkydeveloper commented 3 years ago

Can you try to execute these steps?

  1. Create a channel with just one topic partition
  2. Send a bunch of messages and let the channel deliver them
  3. Try to reproduce your issue
  4. Without recreating the channel (just kill the pod), check out that, after the recovery, the messages are delivered

Also, can you try to upgrade to 0.21 if you can?

ch0312 commented 3 years ago

Ok, my point is, when you kill the kafka dispatcher pod (like in point 1), does the channel replies with 202? Or it doesn't reply?

I killed the kafka-ch-dispatcher Pod after confirming that channel replied with 202. In other words, I have the kafka-ch-dispatcherPod down between the time the channel receives the message and the time it is delivered.

Can you try to execute these steps?

  1. Create a channel with just one topic partition
  2. Send a bunch of messages and let the channel deliver them
  3. Try to reproduce your issue
  4. Without recreating the channel (just kill the pod), check out that, after the recovery, the messages are delivered

I'll upgrade to 0.21. However, this issue only occurs after I create the channel and before the first successful delivery. Therefore, if I run No2, I will not be able to reproduce the issue. What is this step to check?

slinkydeveloper commented 3 years ago

Therefore, if I run No2, I will not be able to reproduce the issue. What is this step to check?

I see now the problem. It might be (and i'm going by guessing) that, without ever successfully delivering a message, no offset is ever committed, so when the consumer group is restarted, the offset is set to the last record + 1. This might be fixed by setting the offset to initial for the consumer group, cc @aliok where should i place this config then? Do I just hardcode it in the dispatcher?

aliok commented 3 years ago

@slinkydeveloper We don't have anything in the channel CRD for that. It might be possible to specify that (not 100% sure) in Sarama config in config-kafka but that means, channel will re-read it when it starts after a crash and it will always start from the specified offset.

aliok commented 3 years ago

Yeah, something like this should work (beware the issue I wrote above though AND I haven't really tested it):

apiVersion: v1
data:
  sarama: |
    Consumer:
      Offsets:
        Initial: XYZ

Value could be OffsetNewest=-1, OffsetOldest=-2 (see https://pkg.go.dev/github.com/Shopify/sarama), or others.

slinkydeveloper commented 3 years ago

Ok so let me try to PR the hardcoded value for channel only, I believe channel users doesn't really need to configure this value

travis-minke-sap commented 3 years ago

Ok so let me try to PR the hardcoded value for channel only, I believe channel users doesn't really need to configure this value

Disagree - don't want to make the assumption that offset is always/only the "oldest"

slinkydeveloper commented 3 years ago

@ch0312 After discussing about this issue on #428, I think the viable solution to fix this problem for you now is to do like ali explained in this comment https://github.com/knative-sandbox/eventing-kafka/issues/420#issuecomment-789561223

ch0312 commented 3 years ago

Thank you. I would like to try sarama, what can I write in "XYZ"?

slinkydeveloper commented 3 years ago

Try -2, like i was doing in #428

ch0312 commented 3 years ago

@slinkydeveloper Thank you. Setting 'Initial: -2' no longer causes problems. Why should I set it to -2?

slinkydeveloper commented 3 years ago

setting to -2 (aka OffsetInitial) makes sure that, when starting consuming the channel topic, you always start from the beginning of this topic.

ch0312 commented 3 years ago

setting to -2 (aka OffsetInitial) makes sure that, when starting consuming the channel topic, you always start from the beginning of this topic.

Thanks. Does this mean that when the channel comes back from a crash, there is a possibility that previously delivered events will be delivered again? As far as I have tried, this has not happened.

slinkydeveloper commented 3 years ago

Does this mean that when the channel comes back from a crash, there is a possibility that previously delivered events will be delivered again?

No, because when it restarts, it will reuse the same offset, because it will be the same consumer group. Starting from the beginning is a behaviour that happens only with new subscriptions

ch0312 commented 3 years ago

No, because when it restarts, it will reuse the same offset, because it will be the same consumer group. Starting from the beginning is a behaviour that happens only with new subscriptions

When OffsetInitial is set to -2, does it mean that the new subscription will re-deliver the previously delivered messages that are still in KafkaTopic? (I'm sorry for asking so many questions. I am trying to figure out the caveats of setting OffsetInitial to -2.)

slinkydeveloper commented 3 years ago

does it mean that the new subscription will re-deliver the previously delivered messages that are still in KafkaTopic?

Yes

(I'm sorry for asking so many questions. I am trying to figure out the caveats of setting OffsetInitial to -2.)

NP, you're welcome and we're here to help!

slinkydeveloper commented 3 years ago

subscriptionA and subscriptionB should be in the same consumer group...

travis-minke-sap commented 3 years ago

I apologize for deleted comment - was trying to remove my response - github acting up on me - i'll stop making a mess here ; )

ch0312 commented 3 years ago

Complements the missing content.

To verify this, I did the following.

  1. I created a kafkachannel with OffsetInitial set to -2 and a subscriptionA and delivered some events.
  2. I then created a new subscriptionB, but the messages already delivered in 1 were not re-delivered.

I thought that subscriptionA and subscriptionB might belong to the same ConsumerGroup. Is there any setting I need to do to create subscriptions for different consumerGroups?

Don't mean to intrude - just trying to help in case slinky is offline...

If SubscriptionB did NOT have the OffsetInitial set to -2, then the default value is to start from the current/latest offset which would explain why SubscriptionB did not receive the older events. Each Subscription should have its own ConsumerGroup with independent offset tracking.

subscriptionA and subscriptionB should be in the same consumer group...

ch0312 commented 3 years ago

If SubscriptionB did NOT have the OffsetInitial set to -2, then the default value is to start from the current/latest offset which would explain why SubscriptionB did not receive the older events. Each Subscription should have its own ConsumerGroup with independent offset tracking.

The OffsetInitial setting is set to kafka-config, so both subscriptionA and subscriptionB have the OffsetInitial set to -2.

slinkydeveloper commented 3 years ago

subscriptionA and subscriptionB have the OffsetInitial set to -2.

Yes but after you dispatch successfully the very first message, they'll both start tracking their offset (separately, because they're 2 separate consumer groups)

ch0312 commented 3 years ago

To verify this, I did the following.

I created a kafkachannel with OffsetInitial set to -2 and a subscriptionA and delivered some events. I then created a new subscriptionB, but the messages already delivered in 1 were not re-delivered.

When I tried again, subscriptionB delivered all the messages in the topic, including the ones already delivered. This is the expected behavior when OffsetInitial=-2.

@slinkydeveloper @travis-minke-sap Thanks for answering my question. I will close this issue.