aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.17k stars 234 forks source link

Consumer/channel does not reauthenticate based on SaslAuthenticateResponse #1080

Open moh-incom opened 21 hours ago

moh-incom commented 21 hours ago

Describe the bug When a consumer's connection outlives its SASL authentication (in this case an OAuth token), then consumer will fail to fetch new records because the broker no longer accepts any requests other than SaslHandshakeRequest and SaslAuthenticateRequest. See KIP-368 for more details.

As an example, if tokens have a valid duration of 5 minutes, the consumer will run fine for 5 minutes after which it will begin failing when it sends new requests. For example, this HeartbeatRequest for the group coordinator fails because the channel's session is expired:

ERROR:aiokafka.consumer.group_coordinator:Error sending HeartbeatRequest_v1 to node 2 [KafkaConnectionError: Connection at my.redacted-kafka.broker closed] -- marking coordinator dead

I suspect this is also an issue for admin and producer clients.

Expected behaviour I expect the consumer to reauthenticate before its session expires.

Environment (please complete the following information):

Reproducible example

The problem can be reproduced with a simple consumer such as this together with a Kafka broker configured to use secured OAuth SASL authentication for example using Strimzi. If it could be of value, I will try to create a Dockerfile to setup such a broker.

import asyncio
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from azure.identity import DefaultAzureCredential

class KafkaOAuthBearerTokenProvider(AbstractTokenProvider):
    async def token(self):
        # logic for fetching token
        ...
async def consume():
    # Kafka consumer configuration with SASL authentication
    consumer = AIOKafkaConsumer(
        'sometopic',
        bootstrap_servers='my.redacted-kafka.broker,my.redacted-kafka.broker2,my.redacted-kafka.broker3',
        auto_offset_reset='earliest',
        security_protocol='SASL_PLAINTEXT', 
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider= KafkaOAuthBearerTokenProvider(),
        group_id= "someconsumergroup"
    )
    # Start the consumer
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print(f"Consumed message: {msg.value.decode('utf-8')} from partition: {msg.partition} and offset: {msg.offset} and header: {msg.headers}")
    finally:
        # Ensure the consumer is stopped gracefully
        await consumer.stop()
moh-incom commented 21 hours ago

I was able to hack reauthentication together by accessing the consumer's internal AIOKafkaConnections and running _do_sasl_handshake() periodically combined with some rather tricky commit locking, heartbeat stopping/starting and an additional lock to ensure that the client does not fetch more records while the handshake is ongoing. If any requests are made during the handshake, the broker will terminate the connection.