open-contracting / kingfisher-collect

Downloads OCDS data and stores it on disk
https://kingfisher-collect.readthedocs.io
BSD 3-Clause "New" or "Revised" License
13 stars 12 forks source link

Use async publisher, instead of blocking connection #1033

Closed jpmckinney closed 9 months ago

jpmckinney commented 1 year ago

All other tools using RabbitMQ (via pika) have adopted async consumers/publishers (when I released yapw 0.1.0 in July), mostly because they can live a long time without the broker disconnecting them (e.g. if there is a long time between downloads by a spider).

In Kingfisher Collect, in this commit https://github.com/open-contracting/kingfisher-collect/commit/cfe913280c996d60a8b2b08348a0eb27175dc799, I re-opened the connection to work around the broker disconnecting from a slow consumer.

yapw has an async publisher, but – when I looked into this in July – it looked tricky to use within Scrapy, which already operates within a Twisted event loop.

At the time, I had found code where pika is run in a thread or in Twisted's reactor.callInThread. We would need to test.

https://docs.twisted.org/en/stable/core/howto/threading.html#running-code-in-threads

jpmckinney commented 9 months ago

Setting TWISTED_REACTOR to twisted.internet.asyncioreactor.AsyncioSelectorReactor causes "This event loop is already running" when starting the yapw client – presumably because Scrapy has started the reactor, and we can't start it again for yapw. The repositories linked above use the default Twisted reactor.

jpmckinney commented 9 months ago

I had used these configurations in original tests, but they weren't needed in the end:

    'CONCURRENT_REQUESTS': 1,
    'LOG_ENABLED': False,
    'LOGSTATS_INTERVAL': 0,
    'STATS_DUMP': False,
    'TELNETCONSOLE_ENABLED': False,
jpmckinney commented 9 months ago

I ended up following something closer to the last example, because it seems impossible to test reactor.callInThread using pytest. Comment in the new code:

# threading.Thread(target=cb) is used, instead of reactor.callInThread(cb), because the latter is hard to
# test. The reactor needs to run for the callback to callInThread() to run. The reactor needs to stop, in
# order for a test to return. But, the reactor isn't restartable. So, testing seems impossible.