aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.12k stars 226 forks source link

[QUESTION] Unable to request metadata from node with id 0: Unable to update metadata from [0] #624

Open tnusraddinov opened 4 years ago

tnusraddinov commented 4 years ago

Hello, I have problem with producer. I am using aiokafka==0.6.0 and kafka-2.5.0 (Commit:66563e712b0b9f84)

First I connect and send data to kafka in AWS server successfully. Then I wait 10 minutes and get logs:

[MainThread][ERROR][aiokafka       ]: Unable to request metadata from node with id 0: 
[MainThread][ERROR][aiokafka       ]: Unable to update metadata from [0]

and trying sending (same) data again it fails with :

[MainThread][WARNING][aiokafka.producer.sender]: Got error produce response: [Error 7] RequestTimedOutError

My setup is using FastApi, aiokafka and gunicorn (workers=4):

...

app = FastAPI()

ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cadata=cdata)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=bootstrap_servers, 
                                        security_protocol="SSL",
                                        ssl_context=ssl_context,
                                        value_serializer=orjson.dumps,                                       
                                        )

@app.on_event("startup")
async def startup_event():
    logger.info('Startup')
    await producer.start()

@app.on_event("shutdown")
async def shutdown_event():
    logger.info('Shutdown')
    await producer.stop()   
    logger.info('Poducer Flushing ....')
    # producer.flush()
    pass

# inject producer object to each request
@app.middleware("http")
async def kafka_middleware(request: Request, call_next):
    global producer 
    request.state.producer = producer
    response = await call_next(request)
    return response

@app.get("/")
async def home(request: Request):
    data = {"Hello": "World", "time": time.time()}
    producer = request.state.producer
    f = await producer.send('my-topic', data )
    return data

I could initialize/connect AIOKafkaProducer at each request but I thing it is not right way.

Kafka server.properties file

############################# Server Basics #############################
broker.id=0

listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://:9093
advertised.listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://ec2-ip-address.eu-central-1.compute.amazonaws.com:9093

listener.security.protocol.map=SSL_INTERNAL:SSL,SSL_EXTERNAL:SSL
inter.broker.listener.name=SSL_INTERNAL

ssl.keystore.location=/path_to/keystore/kafka.keystore.jks
ssl.keystore.password=securepass
ssl.key.password=securepass
ssl.truststore.location=/path_to/truststore/kafka.truststore.jks
ssl.truststore.password=securepass
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=

############################# Log Basics #############################
log.dirs=/var/log/kafka-logs
num.partitions=4

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=4
transaction.state.log.replication.factor=4
transaction.state.log.min.isr=4

All other kafka and Zookeeper configs are defaults.

tnusraddinov commented 4 years ago

Note: when I set parameter

connections_max_idle_ms=10000

in

AIOKafkaProducer(loop=loop, bootstrap_servers=bootstrap_servers, 
                                        security_protocol="SSL",
                                        ssl_context=ssl_context,
                                        value_serializer=orjson.dumps,                                    
                                        connections_max_idle_ms=10000
                                        )

errors not appearing after 10 minutes and producer can send data even after 30 minutes idle. Can someone explain what is the problem?

gabriel-tincu commented 4 years ago

connections_max_idle_ms

is your cluster undergoing changes while this is happening? AKA nodes going down and new ones joining?

tnusraddinov commented 4 years ago

I am new in Kafka. Only one instance of kafka is working with configs above. Every 10 minutes Removed 0 expired offsets is happening. Kafka logs are:

[2020-06-02 14:28:29,490] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer-group in state PreparingRebalance with old generation 4 (__consumer_offsets-31) $
reason: Adding new member consumer-test-consumer-group-1-7a9d636e-c3d6-45aa-8369-caaad27cb62e with group instance id None) (kafka.coordinator.group.GroupCoordinator)            
[2020-06-02 14:28:29,501] INFO [GroupCoordinator 0]: Stabilized group test-consumer-group generation 5 (__consumer_offsets-31) (kafka.coordinator.group.GroupCoordinator)        
[2020-06-02 14:28:29,518] INFO [GroupCoordinator 0]: Assignment received from leader for group test-consumer-group for generation 5 (kafka.coordinator.group.GroupCoordinator)   
[2020-06-02 14:37:23,918] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)                     
[2020-06-02 14:47:23,913] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)                     
.
.
.
[2020-06-02 20:47:23,913] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-06-02 20:51:16,756] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id consumer-test-consumer-group-1-7a9d636e-c3d6-45aa-8369-cbbbd27cb62e] in group test-
consumer-group has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-06-02 20:51:16,757] INFO [GroupCoordinator 0]: Preparing to rebalance group test-consumer-group in state PreparingRebalance with old generation 5 (__consumer_offsets-31) (
reason: removing member consumer-test-consumer-group-1-7a9d636e-c3d6-45aa-8369-caaad27cb62e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-06-02 20:51:16,757] INFO [GroupCoordinator 0]: Group test-consumer-group with generation 6 is now empty (__consumer_offsets-31) (kafka.coordinator.group.GroupCoordinator)
[2020-06-02 20:57:23,914] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-06-02 21:57:23,913] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
.
.
.

Thanks

tvoinarovskyi commented 4 years ago

@tnusraddinov What value do you use for bootstrap_servers? It seems like it was able to connect using bootstrap_servers, but can't reconnect using the value of advertised.listeners (which you have correctly set up to use DNS). And could you elaborate what server is hosting the Kafka broker and from where do you connect? Are there any proxies in between?

tnusraddinov commented 4 years ago

bootstrap_servers is public DNS of EC2 instance on AWS.

bootstrap_servers = 'ec2-ip-eu-central-1.compute.amazonaws.com:9093'

Kafka broker is hosted in EC2 instance (Ubuntu 18.04 in Europe Frankfurt) on Amazon Web Services. I am connecting from work in Turkey. There is no proxies, connecting directly to public DNS of EC2 instance. Thanks

tvoinarovskyi commented 4 years ago

As far as I see it's the same DNS as used in EXTERNAL listener, right? I will try to reproduce the example with my own AWS account, maybe will see similar behaviour. Btw, did you also try to connect without SSL? Does it have similar issues?

tnusraddinov commented 4 years ago

Yes, EXTERNAL listener and bootstrap_servers are the same. I connect without SSL, but I did not try to wait idle without SSL, unfortunately.

tvoinarovskyi commented 4 years ago

@tnusraddinov Sadly I have not been able to reproduce this behaviour with my AWS setup. I've set up a self-signing certificate as described in https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka and allowed access to my AWS instance by using the security group with port 9093 exposed to all IP addresses. In my case, the producer is able to send messages even after 10 mins of inactivity. I have also tried to wait using time.sleep() to wait instead of asyncio.sleep() as well as disconnecting my laptop from the network during the wait.

I feel like it's some kind of networking issue, but I can't wrap my head on the scenario that is going on here. I can only speculate at this point, but I have seen many company NAT having an issue with passing a socket connection close event. I call it a void state of the socket, where no errors are raised but write never returns. It's a quite frequent problem in behaviour with Kubernetes btw. So you can send requests, but they timeout. Right now AIOKafkaClient handles those errors by closing the connection, but will not retry the send over a new connection (as the send already times out by that time).

So, @tnusraddinov are you sure that the producer does not recover from the RequestTimedOutError? Even if it raises 1 such error it should recycle that connection and next send should succeed. If it's such a scenario - then the client works as designed and the fix to lower the idle connection is good. As far as I know, Kafka protocol has no notion of PING messages, as it's designed for higher throughput, so closing the connection will serve as a way to make sure sends are done over fresh connections.

Of course, the solution is not great and maybe it makes more sense for the client to retry at least once over a new connection... To address this the client needs to support the new in Java Client delivery.timeout.ms, the configuration that sets an upper bound on delivery and decouples that from request.timeout.ms, allowing RequestTimeoutError to still be retried until timeout.

tnusraddinov commented 4 years ago

Hi @tvoinarovskyi, I am not sure that it recovers. But previously I noticed that after some time ( a couple of send failures ) it recovers connection then send next message not previously failed messages which are lost. You may be right about network connection. I did not test it between AWS instances with default connections_max_idle_ms where network connection is good.

tvoinarovskyi commented 4 years ago

@tnusraddinov Yes, sadly the timeout messages are removed from the queue and will not be resent next time :( I will try to finish the delivery.timeout.ms for a patch release, it should help out with predictable timeouts and connection retries.

ashish-gupta1-by commented 2 years ago

I am using Azure Event Hub as managed service. I am also getting the same error

AEH_PRODUCER_CONFIG = dict(
    bootstrap_servers=os.getenv('ENVIRONMENT_VARIABLE_AEH_SERVER_PORT'),
    sasl_plain_username=os.getenv('ENVIRONMENT_VARIABLE_AEH_USERNAME'),
    sasl_plain_password=get_secrets().sasl_pw,
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    client_id='python-publisher-client',
    acks=1)

AEH_CONSUMER_CONFIG = dict(
    bootstrap_servers=os.getenv('ENVIRONMENT_VARIABLE_AEH_SERVER_PORT'),
    group_id=os.getenv('ENVIRONMENT_VARIABLE_AEH_GROUP_ID'),
    sasl_plain_username=os.getenv('ENVIRONMENT_VARIABLE_AEH_USERNAME'),
    sasl_plain_password=get_secrets().sasl_pw,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    sasl_mechanism='PLAIN',
    security_protocol='SASL_SSL')

2022-01-11 05:44:59,604 ERROR -- Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR -- Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR -- Unable to update metadata from [0] 2022-01-11 05:50:39,610 ERRO -- Unable to update metadata from [0]

I am using fastapi polling consumers every sec by using getOne() message.

ashish-gupta1-by commented 2 years ago

any update on the issue?

Huangvivi commented 1 year ago

any update on the issue?

danigosa commented 1 year ago

Ame with Azure Event Hub, any update?

TribuneX commented 10 months ago

any update, we are also seeing this issue on a regular basis while being connected to Azure Event Hub