aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 228 forks source link

[QUESTION] AbstractTokenProvider for non-async code #916

Closed Alsiri0n closed 11 months ago

Alsiri0n commented 1 year ago

Hello, I'm facing with problem. I implemented AbstractTokenProvider for my producer client. But I also need to create and delete topics. From my understanding, I can perform these actions only with kafka-python package, which only supports non-async function calls. It is recommended to use this code in the documentation:

from aiokafka.abc import AbstractTokenProvider

class CustomTokenProvider(AbstractTokenProvider):

    async def token(self):
        return asyncio.get_running_loop().run_in_executor(
            None, self._token)

    def _token(self):
        # The actual synchoronous token callback.

In my implementation, it looks like:

class AsyncCustomTokenProvider(AbstractTokenProvider):
    def __init__(self, **conf):
        super().__init__(**conf)
        self.tkn = ""
        self.expired = time.time()
        self.timeout = 600

    async def retrieve_token(self, first_time: bool = False) -> None:
        implementation

    async def token(self):
        return asyncio.get_running_loop().run_in_executor(
            None, self._token)
        # return self.tkn

    def _token(self):
        return self.tkn

But I get an error if I try use my class for configure sasl_oauth_token_provider for KafkaAdminClient in package kafka-python: kafka\conn.py:824: RuntimeWarning: coroutine 'AsyncCustomTokenProvider.token' was never awaited And about my question. Can I implement my case using only one AbstractTokenProvider? Or I must implement Async class for aiokafka and implement Sync class for kafka-python. Thanks for any advice.