aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 227 forks source link

Trying to consume with oauthbear but issues with AbstractTokenProvider maybee #691

Open davidmontgom opened 3 years ago

davidmontgom commented 3 years ago

Hi,

python-kafka simply works with oauthbearer. With aiokafka there are zero examples I am flying blind.

Did I impelent CustomTokenProvider correct? the create_ssl_context() function is what i use in python-kafka

Below is the error I get:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 376, in _on_read_task_error
    read_task.result()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 512, in _read
    resp = await reader.readexactly(4)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 109, in <module>
    loop.run_until_complete(consume())
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users//Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 102, in consume
    await consumer.start()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 341, in start
    await self._client.bootstrap()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/client.py", line 215, in bootstrap
    version_hint=version_hint)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 97, in create_conn
    await conn.connect()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 235, in connect
    await self._do_sasl_handshake()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 316, in _do_sasl_handshake
    payload, expect_response
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at test.com:9094 closed
{"access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJtNDV5a1RZRXhXZzdfZmUybHdBcVRDSGV2bnQtMWMwNjRiTTl2UlN3NVY0In0.eyJleHAiOjE2MDcxMjQzNTYsImlhdCI6MTYwNjUxOTU1NiwianRpIjoiN2M3OGU0ZmItNmQ0ZC00YTAyLWI1YzYtZGVmOTA1OGY0MmY0IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJzdWIiOiI0YzIzOTNhOC0yM2FmLTQ1MDc
0NDU5ZDJhM2E5IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJhdWQiOiJodHRwczovL2Nsb3VkZGF0YXNlcnZpY2UuYXV0aC5uYXNkYXEuY29tL2F1dGgvcmVhbG1zL3Byby1yZWFsbSIsInN1YiI6IjRjMjM5M2E4LTIzYWYtNDUwNy1iYmI2LTFlZTkwODhmZDBiMiIsInR5cCI6IlJlZnJlc2giLCJhenAiOiJzeW5lcmdpc2NhcGl0YWwtZGF2aWQtc2Nob29sZXkiLCJzZXNzaW9uX3N0YXRlIjoiZmQ2NDkyZTMtMzRmMi00ZjEzLWI5MDgtM2M1NmU1OWQwYWYzIiwic2NvcGUiOiJlbWFpbCBwcm9maWxlIn0.5zZw0OZCuDUKzjWVZ-23s-DagXhOtzsvefJ9q8Q7vXA","token_type":"bearer","not-before-policy":0,"session_state":"fd6492e3-34f2-4f13-b908-3c56e59d0af3","scope":"email profile"}
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f9c183e7c88>

Below is my code:

def create_ssl_context():

    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.check_hostname = False
    _ssl_context.load_default_certs()

    return _ssl_context

class CustomTokenProvider(AbstractTokenProvider):
        async def token(self):
            return asyncio.get_running_loop().run_in_executor(
                None, self._token)
        def _token(self):
            token_url = 'https://test.com/auth/realms/pro-realm/protocol/openid-connect/token'
            client = BackendApplicationClient(client_id='adfafd')
            oauth = OAuth2Session(client=client)
            token_json = oauth.fetch_token(token_url=token_url, client_id='adfasdf', client_secret='adfadf')
            token = token_json['access_token']

            return token

loop = asyncio.get_event_loop()

async def consume():
    consumer = AIOKafkaConsumer(
        "test-4.stream", 
        loop=loop, 
        sasl_oauth_token_provider=CustomTokenProvider(),
        security_protocol="SASL_SSL", 
        sasl_mechanism="OAUTHBEARER",
        enable_auto_commit=False,
        ssl_context=create_ssl_context(),
        bootstrap_servers='test.com:9094')
    # Get cluster layout and topic/partition allocation
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()

loop.run_until_complete(consume())
krlx commented 3 years ago

Any updates on this? Having the same issue

chrisbarker commented 3 years ago

@davidmontgom & @krlx - I had a similar problem and resolved it by awaiting the future in CustomTokenProvider. Try this change

        async def token(self):
            return await asyncio.get_running_loop().run_in_executor(
                None, self._token)
benschumacher commented 6 days ago

It seems that the documentation is inconsistent with the fixes provided by @mtomilov.

https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.abc.AbstractTokenProvider.token

I'd be happy to submit a PR for this. Should this issue be closed?

ods commented 5 days ago

Hi @benschumacher,

The build of docs was broken at the time of the latest release, that's why the stable docs wasn't updated. But you can see these changes in the latest: https://aiokafka.readthedocs.io/en/latest/api.html#aiokafka.abc.AbstractTokenProvider.token

If you still think something have to be fixed, contributions are welcome.

benschumacher commented 4 days ago

Thanks for the reply. We made a change to await from the token provider and it is working as expected. I was confused about the mismatch between the published docs and what the code comments had, but the doc build being broken clarifies that. Cheers.