nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
885 stars 188 forks source link

'Queue' object has no attribute 'task_done' after asyncio loop getters problem #29

Closed mffrench closed 7 years ago

mffrench commented 7 years ago

Hi,

I'm currently facing an issue with asyncio loop after more than 48h run (so not easy to reproduce).

This asyncio problem (some loop getters problem I need to troubleshoot deeper) impact the asyncio-nats code as shown bellow :

  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 584, in _process_op_err
    self._flush_queue.task_done()
AttributeError: 'Queue' object has no attribute 'task_done'

Maybe is there some protection code to add here ? If you have any tips to help me troubleshoot deeper let me now anyway...

Here's the code where I'm using asyncio-nats : https://github.com/echinopsii/net.echinopsii.ariane.community.cli.python3/blob/master/ariane_clip3/natsd/driver.py

NOTE : I'm using actor pattern (pykka) with asyncio-nats which may not be a great fit... Probably should I use deeper functions (IE : function implementing NATS protocol only) in asyncio-nats to avoid asyncio logic ?

Full stack trace :

Traceback (most recent call last):
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 491, in _flush_pending
    yield from self._flush_queue.put(None)
  File "/usr/lib64/python3.4/asyncio/queues.py", line 126, in put
    'queue non-empty, why are getters waiting?')
AssertionError: queue non-empty, why are getters waiting?

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/site-packages/ariane_docker/components.py", line 105, in sniff
    next_action=InjectorCachedComponent.action_update, data_blob=self.data_blob())
  File "/usr/lib/python3.4/site-packages/ariane_clip3/injector.py", line 829, in cache
    json_last_refresh=json_last_refresh, data_blob=data_blob).get()
  File "/usr/lib64/python3.4/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/usr/lib64/python3.4/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "/usr/lib/python3.4/site-packages/ariane_clip3/injector.py", line 696, in save
    result = InjectorCachedComponentService.requester.call(args).get()
  File "/usr/lib64/python3.4/site-packages/pykka/threading.py", line 52, in get
    compat.reraise(*self._data['exc_info'])
  File "/usr/lib64/python3.4/site-packages/pykka/compat.py", line 24, in reraise
    raise value
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 201, in _actor_loop
    response = self._handle_receive(message)
  File "/usr/lib64/python3.4/site-packages/pykka/actor.py", line 295, in _handle_receive
    return callee(*message['args'], **message['kwargs'])
  File "/usr/lib/python3.4/site-packages/ariane_clip3/natsd/driver.py", line 507, in call
    next(self.nc.flush(1))
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 433, in flush
    yield from self._send_ping(future)
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 788, in _send_ping
    yield from self._flush_pending()
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 496, in _flush_pending
    NatsError("nats: error kicking the flusher"))
  File "/usr/lib/python3.4/site-packages/nats/aio/client.py", line 584, in _process_op_err
    self._flush_queue.task_done()
AttributeError: 'Queue' object has no attribute 'task_done'

Thank you for your help :)

wallyqs commented 7 years ago

Hi, apologies for the delay in this one. This would have happened when the client running Python 3.4 attempted to reconnect where there were some bugs with the task cancellation.

I've added a couple of fixes which might help with reaching this condition and released them under version v0.4.0.

As you mention, I'm not sure of the mileage we will get with asyncio in Python 3.4 and threadsafety so would recommend possibly upgrading to 3.5 and using the run_coroutine_threadsafe APIs, though native pykka driver without asyncio could be interesting as well...