KxSystems / kafka

kdb+ to Apache Kafka adapter, for pub/sub
https://code.kx.com/q/interfaces
Apache License 2.0
50 stars 30 forks source link

libkfk fails to connect back automatically to broker on azure cloud after certain inactivity time. #119

Open sarritesh opened 6 days ago

sarritesh commented 6 days ago

Describe the bug I see that we are making kafka connection to brokers on azure cloud . It looks like after a while of no activity the connection goes stale and it disconnects from that broker connection. It seems it does retry to connect back but fails giving error that all brokers are down.

To Reproduce Trying connecting to a broker on cloud, do not do activity on the topic on which you connect. The connection would go stale or break after some time.

Expected behavior It should have been able to reconnect back if it goes down. But upon retrial it seems to fail.

Desktop (please complete the following information):

Additional context What are the parameters to be used which can help in keeping connection enable without timeout ( keep connection alive )

sshanks-kx commented 6 days ago

Is it using a consumer or producer client type? Thanks

sarritesh commented 6 days ago

@sshanks-kx It is a Producer, I also tried parameters socket.keepalive.enable as true and conections.max.ideal.ms as 0 but even after this did not work. Wondering if there could be firewall etc. that can be issue ?

sshanks-kx commented 5 days ago

@sarritesh .kfk.Poll calls the underlying kafka lib rd_kafka_poll which has a comment

"The timeout_ms argument specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinitely for an event, provide -1.....An application should make sure to call poll() at regular intervals to serve any queued callbacks waiting to be called."

Slightly confusingly, the consumer (non-producer) version rd_kafka_consumer_poll has

"...must call poll at least every max.poll.interval.ms to remain a member of the consumer group..."

where max.poll.interval.ms is for consumers only.

If you can experiment, can you call .kfk.Poll (passing 0 as the timeout value), using the kdb+ timer ( .z.ts )? If you have any success/failure with that, can you let me know. NOTE: be careful that your not already using .z.ts for something else, which may effect that logic if you change the timer frequency.

Also, do you have statistics.interval.ms set in your producer config?

I'll have to get a moment to replicate here.

sshanks-kx commented 5 days ago

There is a connections.max.idle.ms that can be set on both brokers and clients. In the kafka lib (librdkafka) connections.max.idle.ms ( https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md ) has

Close broker connections after the specified time of inactivity. Disable with 0. If this property is left at its default value some heuristics are performed to determine a suitable default value, this is currently limited to identifying brokers on Azure (see librdkafka issue #3109 for more info).

Links to issue#3109

Other ref https://stackoverflow.com/questions/78033060/keep-kafka-connection-alive-even-if-it-is-idle-for-long-time

sarritesh commented 5 days ago

Thanks @sshanks-kx . I had tried with socket.keepalive.enable as true and connections.max.idle.ms as 0 on the producer side, but that did not work. What I will do is also ask the relevant broker side team to set connections.max.idle.ms as 0 on their side and then come back and update to you if this works. Thanks for coming back on this.

sarritesh commented 5 days ago

By the way one more question @sshanks-kx , while this would help in keeping connection alive, should not this be able to reconnect once disconnected. Or this is because of connection going stale ? I do see in logs same error multiple time about not able to connect to broker which does means that it tries to but is enable to reconnect automatically.

sshanks-kx commented 4 days ago

tried docker-compose instance to test a disconnect

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_RETENTION_MINUTES: 1

running docker compose up then q test_producer.q from our examples after env started & connected. I ctrl-c'd the docker env, and ran docker compose stop. I could see the producer disconnected. I restarted the env using docker compose up & once env started I could see producer connected again. I could run the test_consumer & start publishing, with consumer receiving.

Would need more info on your connection/logs/etc to know more.