bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.21k stars 682 forks source link

Can we Throttle a non-backend object, like a normal callback function? #975

Closed woncoh1 closed 1 year ago

woncoh1 commented 1 year ago

Hi. The recent 2.3.2 and 2.4.0 update introduced aggregates (e.g. Throttle) to backends using the following code from cryptofeed/cryptofeed/backends/aggregate.py:

class AggregateCallback:
    def __init__(self, handler):
        self.handler = handler
        if hasattr(self.handler, '__class__'):
            setattr(self, 'start', self.handler.start)
            setattr(self, 'stop', self.handler.stop)
            self.__name__ = self.handler.__class__

class Throttle(AggregateCallback):
    """
    Wraps a callback and throttles updates based on `window`. Will allow
    1 update per `window` interval; all others are dropped
    """

    def __init__(self, handler, window=60):
        super().__init__(handler)
        self.window = window
        self.last_update = 0

But what if the handler is not a backend object, but a normal callback function?:

from cryptofeed import FeedHandler
from cryptofeed.defines import L2_BOOK
from cryptofeed.exchanges import Binance
from cryptofeed.backends.aggregate import Throttle

async def book(book, receipt_timestamp):
    print(
        f'Top ask: {book.book.asks.index(0)[0]} - {book.book.asks.index(0)[1]}\n'
        f'Top bid: {book.book.bids.index(0)[0]} - {book.book.bids.index(0)[1]}'
    )

fh = FeedHandler()
fh.add_feed(Binance(
    symbols=['BTC-USDT'],
    channels=[L2_BOOK],
    callbacks={L2_BOOK: Throttle(book, window=60)},
))
fh.run()

The callback book is a function, so book.__class__ would be function, so if hasattr(self.handler, '__class__') would be True, but book does not have an attribute or method called start, so self.handler.start would not exist, throwing an error, something as the following:

[/usr/local/lib/python3.10/dist-packages/cryptofeed/backends/aggregate.py](https://localhost:8080/#) in __init__(self, handler, window)
     27 
     28     def __init__(self, handler, window=60):
---> 29         super().__init__(handler)
     30         self.window = window
     31         self.last_update = 0

[/usr/local/lib/python3.10/dist-packages/cryptofeed/backends/aggregate.py](https://localhost:8080/#) in __init__(self, handler)
     15         self.handler = handler
     16         if hasattr(self.handler, '__class__'):
---> 17             setattr(self, 'start', self.handler.start)
     18             setattr(self, 'stop', self.handler.stop)
     19             self.__name__ = self.handler.__class__

AttributeError: 'function' object has no attribute 'start'

Do we need something like if hasattr(self.handler, '__class__') and (self.handler.__class__ != (lambda x: x).__class__): to let the non-backend callback functions pass? Are there more elegant ways to construct or use aggregates with functions without backends? Thanks.

bmoscon commented 1 year ago

this should be fixed by the latest PR (just merged)