strimzi / strimzi-kafka-bridge

An HTTP bridge for Apache Kafka®
Apache License 2.0
280 stars 119 forks source link

Bridge consumer creation fails #532

Closed hylkevds closed 1 year ago

hylkevds commented 3 years ago

We have trouble getting data out of the Bridge. Here is our workflow:

  1. Create Client
  2. Set subscription
  3. request records (empty response because it's the first request)
  4. wait more than 10 seconds, after 10 seconds below error appears in Kafka, client is de-registered by Kafka
  5. request records (empty response because it's the first request)
  6. go to 4.

Error in Kafka logs, line 1 and 2 happen at step 3, line 3 is the error:

2021-07-05 07:51:06,580 INFO [GroupCoordinator 0]: Preparing to rebalance group frost-importers in state PreparingRebalance with old generation 14 (__consumer_offsets-32) (reason: Adding new member frost-importer-3e52e9f6-7255-4487-bf42-d325a4957528 with group instance id None) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-1]
2021-07-05 07:51:09,746 INFO [GroupCoordinator 0]: Stabilized group frost-importers generation 15 (__consumer_offsets-32) with 1 members (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
2021-07-05 07:51:19,826 INFO [GroupCoordinator 0]: Member frost-importer-3e52e9f6-7255-4487-bf42-d325a4957528 in group frost-importers has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]
2021-07-05 07:51:19,828 INFO [GroupCoordinator 0]: Preparing to rebalance group frost-importers in state PreparingRebalance with old generation 15 (__consumer_offsets-32) (reason: removing member frost-importer-3e52e9f6-7255-4487-bf42-d325a4957528 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]
2021-07-05 07:51:19,831 INFO [GroupCoordinator 0]: Group frost-importers with generation 16 is now empty (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]

In the bridge logs, around step 3-5, the following messages appear:

2021-07-05 15:02:30 INFO  poll:85 - [1937501044] POLL Request: from 10.233.96.110:59398, method = GET, path = /consumers/frost-importers/instances/frost-importer/records
2021-07-05 15:02:30 INFO  AbstractCoordinator:596 - [Consumer clientId=frost-importer, groupId=frost-importers] Successfully joined group with generation Generation{generationId=42, memberId='frost-importer-0e40f6cc-46c5-48eb-ae87-d991e0d655e6', protocol='range'}
2021-07-05 15:02:30 INFO  ConsumerCoordinator:626 - [Consumer clientId=frost-importer, groupId=frost-importers] Finished assignment for group at generation 42: {frost-importer-0e40f6cc-46c5-48eb-ae87-d991e0d655e6=Assignment(partitions=[ENVIRONMENT-0])}
2021-07-05 15:02:30 INFO  poll:85 - [1937501044] POLL Response:  statusCode = 200, message = OK
2021-07-05 15:02:30 INFO  AbstractCoordinator:792 - [Consumer clientId=frost-importer, groupId=frost-importers] SyncGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=42, memberId='frost-importer-0e40f6cc-46c5-48eb-ae87-d991e0d655e6', protocol='range'}
2021-07-05 15:02:30 INFO  AbstractCoordinator:472 - [Consumer clientId=frost-importer, groupId=frost-importers] Rebalance failed. org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.
2021-07-05 15:02:30 INFO  AbstractCoordinator:540 - [Consumer clientId=frost-importer, groupId=frost-importers] (Re-)joining group
2021-07-05 15:02:30 INFO  AbstractCoordinator:540 - [Consumer clientId=frost-importer, groupId=frost-importers] (Re-)joining group

We've also noticed that a workaround is to once request records twice within 10 seconds. After doing this once, the subscription in Kafka is set correctly, and data flows as expected.

Workaround:

  1. Create Client
  2. Set subscription
  3. request records (empty response because it's the first request)
  4. within 10 seconds, request records again (this time it has data)
  5. wait more than 10 seconds (no errors in Kafka)
  6. request records (more data)
  7. go to 5

For some reason, the initial registration of the client by the bridge, on Kafka is not completed unless a second records request is done before Kafka throws out the registration.

srinivasev commented 1 year ago

Hi @hylkevds , Is there any solution for this. I am also facing same issue when I checked in Kafka Bridge logs.

srinivasev commented 1 year ago

Hi @scholzj , Please guide me if you have any idea on this. The suggested workaround is not working for me.

2022-10-27 02:34:25 INFO poll:85 - [715700932] POLL Request: from 127.0.0.6:46995, method = GET, path = /consumers/bridge-srini-consumer-group/instances/bridge-srini-quickstart-consumer/records 2022-10-27 02:34:25 INFO poll:85 - [715700932] POLL Response: statusCode = 200, message = OK 2022-10-27 02:34:25 INFO AbstractCoordinator:596 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] Successfully joined group with generation Generation{generationId=7, memberId='bridge-srini-quickstart-consumer-398f799e-4155-45b0-b7b9-7e8f733c2723', protocol='range'} 2022-10-27 02:34:25 INFO ConsumerCoordinator:626 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] Finished assignment for group at generation 7: {bridge-srini-quickstart-consumer-398f799e-4155-45b0-b7b9-7e8f733c2723=Assignment(partitions=[dmm-topic-0, srini-0])} 2022-10-27 02:34:25 INFO AbstractCoordinator:796 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] SyncGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=7, memberId='bridge-srini-quickstart-consumer-398f799e-4155-45b0-b7b9-7e8f733c2723', protocol='range'} 2022-10-27 02:34:25 INFO AbstractCoordinator:470 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] Rebalance failed. org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member. 2022-10-27 02:34:25 INFO AbstractCoordinator:540 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] (Re-)joining group 2022-10-27 02:34:25 INFO AbstractCoordinator:540 - [Consumer clientId=bridge-srini-quickstart-consumer, groupId=bridge-srini-consumer-group] (Re-)joining group

hylkevds commented 1 year ago

I've not seen any better solution so far.

ppatierno commented 1 year ago

The HTTP support in the bridge just mimic the same behaviour of a native Java Kafka client. Usually, with the Java client you have a loop where "polling" for new messages. When you subscribe to a topic, nothing happens in the Kafka client unless saving the subscription locally but the mechanism of joining consumer group starts only with the "poll" which, on the Kafka side, seems to need to happen faster than 10 seconds. You should use the HTTP client just as a normal Kafka client but over HTTP.

ppatierno commented 1 year ago

@hylkevds any more info you need on this, can it be closed?

hylkevds commented 1 year ago

Personally I think this initial poll should be made by the bridge. There is no way for the client to know whether no data was received because there is no data, or because the subscription is not in place. That is simply bad.

ppatierno commented 1 year ago

Personally I think this initial poll should be made by the bridge. There is no way for the client to know whether no data was received because there is no data, or because the subscription is not in place. That is simply bad.

What do you do when not using the HTTP bridge but just the pure Java Kafka client? Does you start a loop by calling a poll right after the subscribe? The first poll(s) will just return nothing as it happens for the bridge, because they are just triggering the subscription related mechanism. So do you think that the Java Kafka client should do some poll for you internally instead ? That's the same of the bridge as I explain. It's just behaving like a Java Kafka client but over HTTP.

hylkevds commented 1 year ago

I don't have any use-cases that use Kafka directly. For HTTP, polling every 10 seconds is just bad. One should be using something that uses a push over a persistent connection for those cases.

ppatierno commented 1 year ago

Tbh what I don't see and I am not able to reproduce on my side is the need for 10 secs. My HTTP client subscribe, then start calling poll every second and after a couple of polls needed to get the subscription mechanism working, it starts to get messages still continuing to poll every second (but it could be even less). Maybe the problem is somewhere else in your case.

I don't have any use-cases that use Kafka directly.

Which would help you to understand how the Kafka protocol works :-)

hylkevds commented 1 year ago

Every second is even worse than ever 10 seconds! I need to be able to poll once every 5 minutes. That doesn't work

ppatierno commented 1 year ago

An event streaming platform polled every 5 minutes? ... I am starting to think you are using Apache Kafka for the wrong use case tbh.

hylkevds commented 1 year ago

It was not my choice... A platform that requires polling ever second over HTTP doesn't have a use case... Should be using push over websockets, not polling for those situations.

ppatierno commented 1 year ago

Exactly, so Apache Kafka is not the right tool becuase its protocol is poll based not push based. It look like your events are not so frequents if 5 minutes is fine, so an event streaming platform could be replaced by a broker using a protocol like AMQP 1.0 which is push based. Or anyway exposed via websockets somehow, but not Apache Kafka.

ppatierno commented 1 year ago

A platform that requires polling ever second over HTTP doesn't have a use case

Not so true imho. In the IoT space where your little device could not have support for push based protocols like AMQP or MQTT but only support for HTTP (maybe because of the used programming language or the knowledge of the team), you would still need to poll frequently for getting status or commands to run from the central system.

hylkevds commented 1 year ago

So this Kafka bridge doesn't have a use case? Because anything that requires updates ever second or faster should not be using polling! Especially not over HTTP... MQTT over Websockets works just fine in the IoT world.

ppatierno commented 1 year ago

Why do you think Kafka can be used just for receiving so for polling messages? It's used for sending messages as well, in that case the HTTP use case is valuable. You send at the frequency you want. And again, if in the IoT space you don't have support for MQTT on your device, but HTTP, you can easily use the bridge to send data to Kafka. Again, I think you are still using the wrong tool for the job. I got it's not your decision, but you should talk with we made the decision then.

scholzj commented 1 year ago

@hylkevds HTTP and Kafka are two very different protocols. Bridging them is never easy. There are different approaches to it, but each has its own pros and cons.

The Strimzi Bridge has chosen to model the Bridge HTTP API to closely mirror the Kafka APIs. That is great in some cases and some people like this approach. Especially if they are already familiar with Kafka - it makes the Bridge easy to use when you actually want to use Kafka clients but for some reason you cannot (for example because of the platform you use, because of resources you have available etc.).

There are certainly also other people who - perhaps because they approach it more from the HTTP side - would prefer to have the Bridge API be designed as a more as an HTTP API first instead of mirroring closely the Kafka APIs. That has its own challenges but also its own advantages of course. It would be great if we could support both approaches. But unfortunately, we do not really have the resources to do it right now. But there might be other tools which offer this approach.