confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
130 stars 897 forks source link

Failed to retrieve OIDC token in serverless architecture #1782

Open maurorosales opened 4 months ago

maurorosales commented 4 months ago

Description

Hi Everyone!

I am encountering an error when trying to produce messages to a kafka topic using the producer code from the getting-started tutorial and even with the one mentioned in the repository called oauth_producer.py, when the architecture is an azure function app or a cloud function app from gcp.

The error message is the following:

Basically creating the directory and creating a symbolic link in azure function solved the problem, but in gcp the permissions to interact with the OS are a little more strict and I can't find a way to create a link or load the certificates from a different location, such as It is found in the producer configuration using: 'ssl.ca.location': '/etc/ssl/certs/ca-certificates.crt', # This is where you specify your CA certificate location

[3:20:14 PM] - %3|1721067614.927|OIDC|rdkafka#producer-1| [thrd:background]: Failed to retrieve OIDC token from "https://login.microsoftonline.com/mitenantID/oauth2/V2.0/token": error setting certificate file: /etc/pki/tls/certs/ca-bundle.crt (-1)

I clarify that from my PC using Azure coretools or the gcp sdk I can execute and produce correctly. I don't know why when I upload it to a function, it starts to fail with that certificate error.

AWS Lambda ---> worked fine Azure Function App --> After created a link, worked fine if not it didn't work, with the same oidc issue Cloud Function App (gcp) --> Not working Tested localy in: wsl -> ubuntu 20.04 windows -> 11

Also running it in a container based app, such as fargate, eks, aks. I didn't find any problem

How to reproduce

Just execute this code in a function app in GCP cloud:

_import functions_framework import json import os import subprocess import certifi from confluent_kafka import Producer import logging

def acked(err, msg): if err is not None: print(f"Failed to deliver message: {err.str()}") else: print(f"Message produced: {msg.topic()} [{msg.partition()}] @ {msg.offset()}") def kafka_producer_eventhub(event_data): ####### This works fine in azure function app but doesnt work in cloud function in GCP ##############

subprocess.run(['sudo', 'ln', '-s', '/etc/ssl/certs/ca-certificates.crt', '/etc/pki/tls/certs/ca-bundle.crt'], check=True)

# print(f"Created symlink")

kafka_topic = "purchases"

# Configuración del productor de Kafka con autenticación OAuth
conf = {
    'bootstrap.servers': "192.168.0.1:9092",
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'sasl.oauthbearer.method': 'OIDC',
    'sasl.oauthbearer.extensions': 'logicalCluster=lkc-123dd,identityPoolId=pool-eded',
    'sasl.oauthbearer.client.id': "miclientid",
    'sasl.oauthbearer.client.secret': "miclientsecret",
    'sasl.oauthbearer.token.endpoint.url': "https://login.microsoftonline.com/mitenantID/oauth2/V2.0/token",
    'sasl.oauthbearer.scope': "myscope",
    'ssl.endpoint.identification.algorithm':'none',
    'debug': 'all',
    'ssl.ca.location': '/etc/ssl/certs/ca-certificates.crt',  # This is where you specify your CA certificate location
    }
# Inicializar el productor de Kafka
producer = Producer(**conf)

# Producir mensaje
try:
    producer.produce(kafka_topic, value=json.dumps(event_data, default=str), callback=acked)
    producer.poll(1)  # Ajustar según sea necesario
except Exception as e:
    print(f"Exception while producing record value - {event_data}: {e}")
finally:
    producer.flush()

@functions_framework.http def hello_http(request):

Extraer datos de la solicitud HTTP

event_data = {"message": "This is payload"}

# Call producer function and pass the payload
kafka_producer_eventhub(event_data)

request_json = request.get_json(silent=True)
request_args = request.args

if request_json and 'name' in request_json:
    name = request_json['name']
elif request_args and 'name' in request_args:
    name = request_args['name']
else:
    name = 'World'
return 'Hello {}!'.format(name)_ 

confluent_kafka.libversion() -> ('2.4.0', 33816831) confluent_kafka.version() -> ('2.4.0', 33816576)

Checklist

Please provide the following information:

tcgarvin commented 8 hours ago

Been investigating this issue too. Looks like librdkafka uses curl, or something close to it, but perhaps doesn't respect CURL_CA_BUNDLE

https://github.com/confluentinc/librdkafka/issues/4761