dbrattli / aioreactive

Async/await reactive tools for Python 3.11+
MIT License
350 stars 24 forks source link

AttributeError: 'NoneType' object has no attribute 'subscribe_async' in aioreactive/combine.py", line 99, in update #34

Closed bueltan closed 1 year ago

bueltan commented 1 year ago

The complete code is extended , but this is a summary.

This exception does not represent a real problem because my application finishes executing all the functions correctly, and it only happens with the first "asend" of the subject incoming_data.


type_data = Dict[str, Any]
incoming_data: rx.AsyncObservable['type_data'] = rx.AsyncSubject()

async def webhook(request):
    payload = await request.json()
    await incoming_data.asend(payload)
    return web.Response(status=200)

async def on_startup(app):

    rs: RequestSender = RequestSender()
    mb: MessageBuilder = MessageBuilder()

    obs_request_sender = pipe(incoming_data,
                              rx.map(lambda x_d: mb.adapt_message(ticket=x_d[1]["ticket"],
                                                                  model_message=x_d[1]["model_message"],
                                                                  destiny='guazuapp',
                                                                  data=x_d[1])),

                              rx.flat_map_async(lambda x_d: rs.producer(messages_to_send=x_d[1]['messages_to_send'])))

    await obs_request_sender.subscribe_async(rx.AsyncAnonymousObserver(on_next_obs_request_sender, a_throw ))

async def on_next_obs_request_sender(payload):
    print(payload)

async def a_throw(ex: Exception) -> None:
    print(ex)

async def aio_app():
    app = web.Application()
    app.on_startup.append(on_startup)
    app.add_routes([web.post('/middleware_receptor', webhook)])
    return app

def main():
    port = os.environ.get("PORT", 5055)
    web.run_app(aio_app(), host="localhost", port=int(port))

if __name__ == "__main__":
    main()


class RequestSender:

    n_workers:int = 3
    queue = asyncio.Queue()

    @log_function_runtime
    async def producer(self, messages_to_send: List[Dict[str, str]]):
        for dict_message in messages_to_send:
            self.queue.put_nowait(dict_message)
        # Create (n_workers:int) worker tasks to process the queue concurrently.
        tasks = []

        session  = aiohttp.ClientSession()
        for i in range(self.n_workers):
            task = asyncio.create_task(self.request_worker(session))
            tasks.append(task)
            # Wait until the queue is fully processed.
        await self.queue.join()

        # Cancel our worker tasks.
        for task in tasks:
            task.cancel()
        # Wait until all worker tasks are cancelled.
        await asyncio.gather(*tasks, return_exceptions=True)
        await session.close()

    @log_function_runtime
    async def request_worker(self, session):
        while True:
            dict_message:Dict[str, str] = await self.queue.get()
            destiny:str = dict_message.get("destiny")
            body_message = dict_message.pop(destiny)
            function_name= destiny
            await self.request_post(function_name, body_message, session, dict_message)
            self.queue.task_done()

    async def request_post(self, function_name: str, body_message: str, session, dict_message:Dict[str, Any]):
        async with session.post(url=url, headers=headers, data=body_message) as response_post:
            response = await response_post.text()
            print(f"response: {response}")
            return rx.single(response)

This is all the trace I have

Task exception was never retrieved future: <Task finished name='Task-4' coro=<start_immediate..runner() done, defined at /usr/local/lib/python3.10/site-packages/expression/core/aiotools.py:86> exception=AttributeError("'NoneType' object has no attribute 'subscribe_async'")> Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/expression/core/aiotools.py", line 87, in runner return await computation File "/usr/local/lib/python3.10/site-packages/aioreactive/combine.py", line 146, in worker await message_loop(initial_model) File "/usr/local/lib/python3.10/site-packages/aioreactive/combine.py", line 141, in message_loop model = await update(msg, model) File "/usr/local/lib/python3.10/site-packages/aioreactive/combine.py", line 99, in update inner = await xs.subscribe_async(obv(model.key)) AttributeError: 'NoneType' object has no attribute 'subscribe_async'

Package Version


aiohttp 3.8.1 aiohttp-jinja2 1.5 aioreactive 0.16.0 aiosignal 1.2.0 async-timeout 4.0.2 attrs 22.1.0 autopep8 1.6.0 certifi 2022.6.15 charset-normalizer 2.1.0 expression 2.0.1 frozenlist 1.3.0 greenlet 1.1.2 idna 3.3 Jinja2 3.1.2 MarkupSafe 2.1.1 multidict 6.0.2 mysql-connector-python 8.0.29 pip 22.0.4 protobuf 4.21.4 pycodestyle 2.9.1 requests 2.28.1 ring 0.9.1 Rx 3.2.0 setuptools 58.1.0 six 1.16.0 SQLAlchemy 1.4.39 toml 0.10.2 typing_extensions 4.1.1 urllib3 1.26.11 watchdog 2.1.9 wheel 0.37.1 wirerope 0.4.5 yarl 1.7.2

bueltan commented 1 year ago

This issue was generated for incorrect use the queue.task_done(), moving the queue.task_done() to the method producer the problem was fixed , activate all breakpoint in vscode was the great help.