tastyware / tastytrade

An unofficial, sync/async Python SDK for Tastytrade!
https://tastyworks-api.rtfd.io
MIT License
126 stars 43 forks source link

Getting CancelledError while fetching greeks #160

Closed abhi1010 closed 3 months ago

abhi1010 commented 4 months ago

I am getting CancelledError with this line of code:

self.greeks[e.eventSymbol] = e

It goes away when I disable this line. I will provide my sample code below (retrieved from Advanced Usage.

future: <_GatheringFuture finished exception=CancelledError()>
Traceback (most recent call last):
  File "~/trading/brokers/tasty_feed.py", line 144, in _update_greeks
    async for e in self.streamer.listen(EventType.GREEKS):
  File "ve/lib/python3.12/site-packages/tastytrade/streamer.py", line 455, in listen
    yield await self._queues[event_type].get()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError
2024-08-03 11:04:45,615, ERROR : [base_events.py:1821 - default_exception_handler() ] - _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
Traceback (most recent call last):
  File "~/trading/brokers/tasty_feed.py", line 144, in _update_greeks
    async for e in self.streamer.listen(EventType.GREEKS):
  File "ve/lib/python3.12/site-packages/tastytrade/streamer.py", line 455, in listen
    yield await self._queues[event_type].get()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError

Code:


async def main(session, tickers):
  tasty_monthly = get_tasty_monthly()
  logger.info(f'tasty_monthly = {tasty_monthly}')

  live_prices_list = await asyncio.gather(
      *
      [LivePrices.create(session, ticker, tasty_monthly) for ticker in tickers])

  logger.info(f'live_prices_list = {live_prices_list}')

@dataclass
class LivePrices:
  quotes: dict[str, Quote]
  greeks: dict[str, Greeks]
  streamer: DXLinkStreamer
  puts: list[Option]
  calls: list[Option]

  @classmethod
  async def create(
      cls,
      session: Session,
      symbol: str,
      expiration: date = today_in_new_york()):
    chain = get_option_chain(session, symbol)
    options = [o for o in chain[expiration]]
    streamer_symbols = [o.streamer_symbol for o in options]

    streamer = await DXLinkStreamer.create(session)
    logger.info(f'[{symbol}]:: About to subscribe to: {symbol} for greeks')
    await streamer.subscribe(EventType.GREEKS, streamer_symbols)
    logger.info(f'[{symbol}]:: Finished subscribing to: {symbol}')

    puts = [o for o in options if o.option_type == OptionType.PUT]
    calls = [o for o in options if o.option_type == OptionType.CALL]
    self = cls({}, {}, streamer, puts, calls)

    t_listen_greeks = asyncio.create_task(self._update_greeks())
    asyncio.gather(t_listen_greeks)

    while len(self.greeks) < len(options):
      len_options = len(options)
      logger.info(f'[{symbol}]:: waiting for greeks...')
      logger.info(
          f'[{symbol}]:: len(greeks) = {len(self.greeks)} / {len_options}')
      await asyncio.sleep(3)
    return self

  async def _update_greeks(self):
    async for e in self.streamer.listen(EventType.GREEKS):
      logger.info(f'Received greek: {e}; \n    keys={list(self.greeks.keys())}')
      self.greeks[e.eventSymbol] = e

if __name__ == "__main__":
  session = ProductionSession(env_username, env_password, remember_token=True)
  logger.info(f'logged in...')
  asyncio.run(main(session, ['AAPL', 'AMZN']))
Graeme22 commented 4 months ago

Hello! It looks to me like everything is running fine. But since you don't do anything after creating the two LivePrices objects (by the way, you could definitely combine into one for better performance), the running tasks get killed as the program exits. Try doing this in your main function:

live_apple = await LivePrices.create(session, 'AAPL', tasty_monthly)
live_amazon = await LivePrices.create(session, 'AMZN', tasty_monthly)
print(live_apple.greeks)
abhi1010 commented 3 months ago

Thanks, I got what you mean.