confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
177 stars 3.14k forks source link

Constantly facing "Timed out LeaveGroupRequest in flight" Error in kafka consumer #3226

Open Udayaprasad opened 3 years ago

Udayaprasad commented 3 years ago

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

We are running Kafka in dockerized containers with 3 zookeepers and 3 Kafka broker instances. below are the image details of each of the service. zookeeper:3.4.9 and confluentinc/cp-kafka:5.5.3

In the AI code, we have 8 producers, which will take the streaming videos as input (one producer, one camera) and send them to the consumers (one producer, one consumer - Message size will be more since it is processing image frames).

We are facing a Consumer timeout error after 3-4 hours of running AI models. Below is the Timeout error it occurs. I have also pasted the Kafka container configuration below, please verify.

It is constantly failing. We have tried with different configurations by increasing the timeout and reducing the message bytes, but no luck.

@edenhill Could you please suggest any alternatives to avoid these timeout errors?

Error Details

AI_Models | %3|1611286416.032|FAIL|rdkafka#consumer-12| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#127.0.0.1:4702 failed: Connection refused (after 0ms in state CONNECT, 4 identical error(s) suppressed)
AI_Models | %3|1611286416.449|FAIL|rdkafka#consumer-15| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#127.0.0.1:4702 failed: Connection refused (after 0ms in state CONNECT, 4 identical error(s) suppressed)

How to reproduce

Create 8 producers, consumers, and topics with the following Kafka configuration.

KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19096,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:4701
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
KAFKA_MESSAGE_MAX_BYTES: 20000000
KAFKA_REQUEST_TIMEOUT_MS: 800000
KAFKA_REPLICA_SOCKET_TIMEOUT_MS: 800000
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 1200000
KAFKA_NUM_IO_THREADS: 16
KAFKA_NUM_NETWORK_THREADS: 6
KAFKA_NUM_PARTITIONS: 3
KAFKA_NUM_REPLICA_FETCHERS: 3
KAFKA_REPLICA_FETCH_MAX_BYTES: 40000000
KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES: 120000000
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"

Also below is the consumer configuration

"group.id": "kafka-multi-video-stream",
"enable.auto.commit": false,
"linger.ms": 100,
"session.timeout.ms": 800000,
"request.timeout.ms": 800000,
"heartbeat.interval.ms": 30000,
"max.poll.interval.ms": 1200000,
"auto.offset.reset": "earliest"

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

chrisunderscorek commented 3 years ago

Connection refused (after 0ms in state CONNECT

This msg is a TCP/network problem indication.

So probable some firewall/packetfilter/TCP stack etc. lost the state because the connection in a (table) expired? or the TCP connection cannot established after the closure of the old connection or the kafka server (LISTEN socket) died

edenhill commented 3 years ago

Yeah, what @chrisunderscorek says, the client is unable to connect to the broker, for whatever reason.

But you also have a problem with your config, there is typically no reason to have a large session.timeout.ms, it will only delay consumer group fault detection. Keep it at its default value. Also questioning the high request.timeout.ms and heartbeat.interval.ms, looks like it is optimized for very high-latency links.

Udayaprasad commented 3 years ago

@edenhill @chrisunderscorek AI_Models and Kafka docker services are on the same network (In a single host machine).

Is there any reason that the consumer client is unable to connect to the broker? or Any reason, why it is failing after 3-4 hours?

Please suggest any optimal configuration settings for the consumer. Ours is a video streaming application.

Below are the additional ERROR details.

ESC[36mAI_Models |ESC[0m %5|1611306602.704|REQTMOUT|rdkafka#consumer-62| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5486m
s, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998119ms
ESC[36mAI_Models |ESC[0m %4|1611306602.704|REQTMOUT|rdkafka#consumer-62| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out 1 in-flight, 0 retry-queued, 0 out-queue
, 0 partially-sent requests
ESC[36mAI_Models |ESC[0m %3|1611306602.704|FAIL|rdkafka#consumer-62| [thrd:GroupCoordinator]: GroupCoordinator: 1 request(s) timed out: disconnect (after 205174ms in st
ate UP)

Network configuration info

networks:
  commonnetwork:
        external:
          name: commonnetwork
edenhill commented 3 years ago

Use default settings for session.timeout.ms, heartbeat.interval.., request.timeout.ms

Udayaprasad commented 3 years ago

Before the current configuration, I have used default settings only (The same error). Later then, I have started configuring these parameters.

session.timeout.ms, heartbeat.interval.., request.timeout.ms, 
KAFKA_MESSAGE_MAX_BYTES,
KAFKA_REPLICA_FETCH_MAX_BYTES, 
KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES.
edenhill commented 3 years ago

"Connection refused" is a TCP layer issue, it is outside the control of the client. Debug your environment. As for the timeouts; just use the defaults.

Udayaprasad commented 3 years ago

Okay, @edenhill. I will take a look. Also, I will keep the default configuration for timeouts. Any suggestion on the message.bytes configuration for better throughput?

edenhill commented 3 years ago

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-throughput

Udayaprasad commented 3 years ago

@edenhill This time I went with the default configuration, but still I am facing a Timeout error after running for 3-4 hours (No Connection Refused errors).

"producer_config": {
        "bootstrap.servers": "kafkaservice_kafka1_1:4701,kafkaservice_kafka2_1:4702,kafkaservice_kafka3_1:4703",
        "enable.idempotence": true,
        "acks": "all",
        "retries": 5,
        "message.timeout.ms":  214748360,
        "linger.ms": 1000,
        "batch.num.messages": 32
      },
      "consumer_config": {
        "bootstrap.servers": "kafkaservice_kafka1_1:4701,kafkaservice_kafka2_1:4702,kafkaservice_kafka3_1:4703",
        "group.id": "kafka-multi-video-stream",
        "enable.auto.commit": false,
        "linger.ms": 1000,
        "max.poll.interval.ms": 1200000,
        "auto.offset.reset": "earliest"
      }

Error Details:-

AI_Models | %5|1611332028.032|REQTMOUT|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5982ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 1008460ms
AI_Models | %5|1611332029.590|REQTMOUT|rdkafka#consumer-4| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5013ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 1006012ms
AI_Models | %5|1611332030.589|REQTMOUT|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5454ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 1004950ms
AI_Models | %5|1611332031.001|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5004ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 1005500ms
AI_Models | %5|1611332035.609|REQTMOUT|rdkafka#consumer-9| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5530ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998103ms
AI_Models | %5|1611332299.959|REQTMOUT|rdkafka#consumer-10| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5349ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998424ms
AI_Models | %5|1611332299.968|REQTMOUT|rdkafka#consumer-11| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5356ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998416ms
AI_Models | %5|1611332299.969|REQTMOUT|rdkafka#consumer-12| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5356ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998122ms
AI_Models | %5|1611332299.988|REQTMOUT|rdkafka#consumer-13| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5374ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998116ms
AI_Models | %5|1611332299.989|REQTMOUT|rdkafka#consumer-14| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5375ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998123ms
AI_Models | %5|1611332299.991|REQTMOUT|rdkafka#consumer-15| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5377ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998128ms
AI_Models | %5|1611332299.993|REQTMOUT|rdkafka#consumer-16| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5374ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998135ms
AI_Models | %5|1611332300.017|REQTMOUT|rdkafka#consumer-17| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5398ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998121ms
AI_Models | %5|1611332513.783|REQTMOUT|rdkafka#consumer-18| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5475ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998137ms
AI_Models | %5|1611332513.788|REQTMOUT|rdkafka#consumer-20| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5478ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998138ms
AI_Models | %5|1611332513.789|REQTMOUT|rdkafka#consumer-23| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5474ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998145ms
AI_Models | %5|1611332513.791|REQTMOUT|rdkafka#consumer-21| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5480ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998141ms
AI_Models | %5|1611332513.799|REQTMOUT|rdkafka#consumer-24| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5484ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998135ms
AI_Models | %5|1611332513.799|REQTMOUT|rdkafka#consumer-19| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5490ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998123ms
AI_Models | %5|1611332513.801|REQTMOUT|rdkafka#consumer-22| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5486ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998505ms
AI_Models | %5|1611332513.810|REQTMOUT|rdkafka#consumer-25| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5495ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998129ms
AI_Models | %5|1611332729.476|REQTMOUT|rdkafka#consumer-29| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out LeaveGroupRequest in flight (after 5500ms, timeout #0): possibly held back by preceeding blocking JoinGroupRequest with timeout in 998126ms
AI_Models | %5|1611332729.479|R

For higher throughput, I have used more value for linger.ms property.

edenhill commented 3 years ago

One of your group's consumers is not calling poll() and thus blocking the Join of this consumer.

Udayaprasad commented 3 years ago

I have used one consumer per one topic. Each consumer will run in a separate thread, like this we have 8 different threads for all 8 consumers.

## Each consumer will call this function in independent thread
def consume_topic_data(self, args):
    try:
        return self.read_data(args)
    except Exception:
        error_message = str(traceback.format_exc())
        args["isException"] = 1
        args["exceptionMessage"] = error_message
        return args

def read_data(self, args):
    consumer = Consumer(self.config)
    consumer.subscribe([args["topic"]])
    return self.run(consumer, 0, args)

def run(self, consumer, msg_count, args)
     while True:
        msg = consumer.poll(timeout=200.0)
        if msg is None:
            break
        elif msg.error() is None:
            # convert image bytes data to numpy array of dtype uint8
            nparr = np.frombuffer(msg.value(), np.uint8)

            ###################################
            ####AI Code will be called here####
            ###################################

            ## Re- trigger new consumers if added (for new cameras)
            args = get_unique_topic_names(db)
            if len(args) > 0:
                consumer_thread = ConsumerThread(
                    consumer_config, producer_config["batch.num.messages"])
                consumer_thread.start(args, num_threads=3)

            msg_count += 1
            if msg_count % self.batch_size == 0:
                # commit synchronously
                consumer.commit(asynchronous=False)
                # reset the parameters
                msg_count = 0

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

with concurrent.futures.ThreadPoolExecutor() as executor:
    future_notes = []
    for func_params in args:
        future_notes.append(
            executor.submit(
                self.consume_topic_data,
                args=func_params))
edenhill commented 3 years ago

If each consumer subscribes to a unique topic you don't need to have them in the same group since there will be no balancing of partitions across those topics, so use a unique group.id for each topic's consumer to avoid the rebalancing delays.

Udayaprasad commented 3 years ago

Sure, I will give it a try.

Since this is a video streaming application, we need to maintain the sequence, due to this reason we had put one topic per one camera with a single partition (Not scalable).

Can you suggest any approach to parallelize the producer and consumer services, at the same time maintain the video streaming in sequence?

Udayaprasad commented 3 years ago

Looks like setting different group.id for each consumer works fine. I will observe the behavior for some more time.

Thank you, @edenhill

Udayaprasad commented 3 years ago

@edenhill Looks like it again comes up even after setting a unique group.id for each consumer thread. Below is the Code.

Updated the group.id con_config["group.id"] = str(uuid.uuid4())+"-"+str(int(round(time.time() * 1000)))

## Each consumer will call this function in independent thread
def consume_topic_data(self, args):
    try:
        return self.read_data(args)
    except Exception:
        error_message = str(traceback.format_exc())
        args["isException"] = 1
        args["exceptionMessage"] = error_message
        return args

def read_data(self, args):
    logger.info("@"*50, " Previous group.id: ", "@"*50, self.config["group.id"])
    con_config = deepcopy(self.config)
    con_config["group.id"] = str(uuid.uuid4())+"-"+str(int(round(time.time() * 1000)))
    logger.info("@" * 50, " Later group.id: ", "@" * 50, con_config["group.id"])
    # print("################ consumer config ################: ", con_config)
    consumer = Consumer(con_config)
    consumer.subscribe([args["topic"]])
    return self.run(consumer, 0, args)

def run(self, consumer, msg_count, args)
     while True:
        msg = consumer.poll(timeout=200.0)
        if msg is None:
            break
        elif msg.error() is None:
            # convert image bytes data to numpy array of dtype uint8
            nparr = np.frombuffer(msg.value(), np.uint8)

            ###################################
            ####AI Code will be called here####
            ###################################

            ## Re- trigger new consumers if added (for new cameras)
            args = get_unique_topic_names(db)
            if len(args) > 0:
                consumer_thread = ConsumerThread(
                    consumer_config, producer_config["batch.num.messages"])
                consumer_thread.start(args, num_threads=3)

            msg_count += 1
            if msg_count % self.batch_size == 0:
                # commit synchronously
                consumer.commit(asynchronous=False)
                # reset the parameters
                msg_count = 0

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                  .format(msg.topic(), msg.partition()))
        else:
            print('Error occured: {0}'.format(msg.error().str()))

with concurrent.futures.ThreadPoolExecutor() as executor:
    future_notes = []
    for func_params in args:
        future_notes.append(
            executor.submit(
                self.consume_topic_data,
                args=func_params))

Error Details:-

Read from consumer time: %5|1613628370.021|REQTMOUT|rdkafka#consumer-23| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out HeartbeatRequest in flight (after 10372m
s, timeout #0): possibly held back by preceeding OffsetCommitRequest with timeout in 49193ms
%4|1613628370.021|REQTMOUT|rdkafka#consumer-23| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent reques
ts
%3|1613628370.021|FAIL|rdkafka#consumer-23| [thrd:GroupCoordinator]: GroupCoordinator: 1 request(s) timed out: disconnect (after 4023571ms in state UP)
'Traceback (most recent call last):\n File "kafka/consumer/consumer_app.py", 
line 55, in consume_topic_data\n return self.read_data(args)\n File "kafka/consumer/consumer_app.py",
line 51, in read_data\n return self.run(consumer, 0, args)\n File "kafka/consumer/consumer_app.py",
line 276, in run\n consumer.commit(asynchronous=False)\ncimpl.KafkaException: KafkaError{code=UNKNOWN_MEMBER_ID,val=25,str="Commit failed: Broker: Unknown member"}\n'}

Consumer Config Details:-

"consumer_config": {
    "bootstrap.servers": "localhost:4701,localhost:4702,localhost:4703",
    "group.id": "kafka-multi-video-stream",
    "enable.auto.commit": false,
    "max.poll.interval.ms": 9200000,
    "auto.offset.reset": "latest"
}
wincent1 commented 1 year ago

Hi,have you solved the problem?I encountered the same confusion @Udayaprasad . Is it really because of the network?

everythings-gonna-be-alright commented 1 year ago

The same, but only when throughput is too high.

Screenshot 2023-07-17 at 20 05 58 Screenshot 2023-07-17 at 20 06 12

php:8.1.21-fpm-alpine3.18 + librdkafka 2.1.1-r0 + rdkafka 6.0.3 + php-enqueue/rdkafka 0.10.19

%5|1689613166.126|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out OffsetCommitRequest in flight (after 48960ms, timeout #3)
%5|1689613166.126|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out OffsetCommitRequest in flight (after 48960ms, timeout #4)
%4|1689613166.129|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/2: Timed out 388 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1689613166.129|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: 10.200.1.243:9094: 855 request(s) timed out: disconnect (after 405384ms in state UP)
%3|1689613166.129|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: GroupCoordinator: 10.200.1.243:9094: 855 request(s) timed out: disconnect (after 405384ms in state UP)
%4|1689613216.149|COMMITFAIL|rdkafka#consumer-1| [thrd:main]: Offset commit (manual) failed for 1/1 partition(s) in join-state wait-unassign-to-complete: Broker: Unknown member: events[21]@25351093(Broker: Unknown member)

I tried many different parameters, but it didn't help.

group.id: '%env(KAFKA_EVENTS_CONSUMER_GROUP)%'
metadata.broker.list: '%env(KAFKA_BROKER_LIST)%'
enable.auto.commit: 'false'
auto.commit.interval.ms: '10000'
socket.timeout.ms: '12000'
socket.max.fails: '5000'
heartbeat.interval.ms: '100'
session.timeout.ms: '60000'
fetch.wait.max.ms: '300'
fetch.min.bytes: '5000000'
fetch.max.bytes: '204857600'
max.partition.fetch.bytes: '16291456'
max.poll.interval.ms: '600000'
reconnect.backoff.ms: '50'