bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.13k stars 664 forks source link

Questdb backend does not work with multiprocessing option #1041

Open mirageAlchemy opened 3 weeks ago

mirageAlchemy commented 3 weeks ago

Describe the bug When config={"backend_multiprocessing": True} in feed definition, questdb backend will crash by raising an exception

To Reproduce Steps to reproduce the behavior:

Please see the LICENSE file for the terms and conditions associated with this software. """

from cryptofeed import FeedHandler from cryptofeed.defines import L2_BOOK, TICKER, TRADES, OPEN_INTEREST, L3_BOOK from cryptofeed.exchanges import Coinbase, BinanceFutures, Binance, Kraken, KrakenFutures from cryptofeed.raw_data_collection import AsyncFileCallback

from cryptofeed.backends.quest import BookQuest, OpenInterestQuest, TradeQuest

QUEST_HOST = "127.0.0.1" QUEST_PORT = 9009

def main(): config = {"log": {"filename": "redis-demo.log", "level": "INFO"}, "backend_multiprocessing": True} f = FeedHandler(config=config) f.add_feed( Binance( max_depth=10, symbols=["BTC-USDT"], channels=[L2_BOOK, TRADES], callbacks={ L2_BOOK: BookQuest(host=QUEST_HOST, port=QUEST_PORT), TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT), }, config={"backend_multiprocessing": True}, ) )


Traceback:

File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/connection_handler.py", line 69, in _create_connection await self._handler(connection, self.handler) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/connection_handler.py", line 119, in _handler await handler(message, connection, self.conn.last_message) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/exchanges/binance.py", line 518, in message_handler await self._trade(msg, timestamp) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/exchanges/binance.py", line 195, in _trade await self.callback(TRADES, t, timestamp) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/feed.py", line 258, in callback await cb(obj, receipt_timestamp) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/backends/backend.py", line 97, in call await self.write(data) File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/backends/quest.py", line 63, in write await self.queue.put(update) AttributeError: 'tuple' object has no attribute 'put'



**Expected behavior**
it should not raise the exception

**Screenshots**
If applicable, add screenshots to help explain your problem.

**Operating System:**
 - macOS, linux, etc

**Cryptofeed Version**
- 2.4.0
mirageAlchemy commented 3 weeks ago

after replace all the await self.queue.put(update) in quest.py with

        if self.multiprocess:
            self.queue[1].send(update)
        else:
            await self.queue.put(update)

the problem is gone. I noticed that for generally other backend handler, this branch condition is written in the base class, but BookQuest has its own __call__ and write, and did not add the branch.