faust-streaming / mode

Python AsyncIO Services
https://faust-streaming.github.io/mode/
Other
43 stars 16 forks source link

Fix imports from __init__ so mode-streaming can be used by other libraries #28

Closed wbarnha closed 1 year ago

wbarnha commented 1 year ago

Closes #27.

wbarnha commented 1 year ago

This definitely fixes #27 but upon running Faust in my own tests, there's another issue:

[2022-11-21 15:53:54,611] [1] [ERROR] [^---AIOKafkaConsumerThread]: Crashed reason=TypeError("Service.itertimer() got an unexpected keyword argument 'loop'") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 843, in _execute_task
    await task
  File "/usr/local/lib/python3.10/dist-packages/mode/threads.py", line 266, in _thread_keepalive
    async for sleep_time in self.itertimer(
TypeError: Service.itertimer() got an unexpected keyword argument 'loop'
wbarnha commented 1 year ago

And another one:

[2022-11-21 15:59:24,490] [1] [ERROR] [^---Conductor]: Crashed reason=TypeError('An asyncio.Future, a coroutine or an awaitable is required') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 843, in _execute_task
    await task
  File "/usr/local/lib/python3.10/dist-packages/faust/transport/conductor.py", line 288, in _subscriber
    await self.app.agents.wait_until_agents_started()
  File "/usr/local/lib/python3.10/dist-packages/faust/agents/manager.py", line 80, in wait_until_agents_started
    await self.wait_for_stopped(self._agents_started)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 704, in wait_for_stopped
    return (await self.wait(*coros, timeout=timeout)).stopped
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 711, in wait
    return await self._wait_one(coros[0], timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 779, in _wait_one
    results = await self.wait_first(coro, timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 740, in wait_first
    futures = {
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 741, in <dictcomp>
    coro: asyncio.ensure_future(
  File "/usr/lib/python3.10/asyncio/tasks.py", line 615, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 630, in _ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable '
TypeError: An asyncio.Future, a coroutine or an awaitable is required
wbarnha commented 1 year ago

LGTM for now, address the other bugs in separate PRs...

wbarnha commented 1 year ago

And another one:

[2022-11-21 15:59:24,490] [1] [ERROR] [^---Conductor]: Crashed reason=TypeError('An asyncio.Future, a coroutine or an awaitable is required') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 843, in _execute_task
    await task
  File "/usr/local/lib/python3.10/dist-packages/faust/transport/conductor.py", line 288, in _subscriber
    await self.app.agents.wait_until_agents_started()
  File "/usr/local/lib/python3.10/dist-packages/faust/agents/manager.py", line 80, in wait_until_agents_started
    await self.wait_for_stopped(self._agents_started)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 704, in wait_for_stopped
    return (await self.wait(*coros, timeout=timeout)).stopped
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 711, in wait
    return await self._wait_one(coros[0], timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 779, in _wait_one
    results = await self.wait_first(coro, timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 740, in wait_first
    futures = {
  File "/usr/local/lib/python3.10/dist-packages/mode/services.py", line 741, in <dictcomp>
    coro: asyncio.ensure_future(
  File "/usr/lib/python3.10/asyncio/tasks.py", line 615, in ensure_future
    return _ensure_future(coro_or_future, loop=loop)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 630, in _ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

For future reference, it seems upgrading my local instance of aiohttp to the latest version (3.8.3) fixed the issue.