tastyware / tastytrade

An unofficial Python SDK for Tastytrade!
MIT License
98 stars 33 forks source link

Best Practices: Best Way to Keep the Session Valid? #134

Closed UpsidePotential closed 3 months ago

UpsidePotential commented 3 months ago

First amazing job with this repo. The Official Tasty API has a ton of issues with Futures and this one worked out of the gate.

Currently I'm mostly just using this to get "real time" prices of various futures and index values. I have this running in a container on a webserver and I have a rest endpoint to fetch the current values.

What is the best way to keep the session open after 24 hours?

Does the rememberme actually keep the session open forever?


import asyncio
import datetime
import os
from threading import Thread
from tastytrade import ProductionSession
from tastytrade.instruments import Future
from tastytrade import DXLinkStreamer
from tastytrade.dxfeed import EventType

from queue import Queue

def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
    asyncio.set_event_loop(loop)
    loop.run_forever()

class TastyClient:
    def __init__(self):
        self.session = ProductionSession(os.environ.get('TWUSER'), os.environ.get('TWPASS'))
        self.loop = asyncio.new_event_loop()
        self.t = Thread(target=start_background_loop, args=(self.loop,), daemon=True)
        self.t.start()
        self.quotes = {}
        self.queue = Queue(maxsize=0)

        asyncio.run_coroutine_threadsafe(self.prices(), self.loop)

    def getSession(self):
        if not self.session.validate():
            self.session = ProductionSession(os.environ.get('TWUSER'), os.environ.get('TWPASS'))
        return self.session

    async def prices(self):
        async with DXLinkStreamer(self.session) as streamer:
            subs_list = []  # list of symbols to subscribe to
            futures = Future.get_futures(self.session, product_codes=['VX', 'ES'])
            vx_futures = [future for future in futures if future.product_code == 'VX']
            es_future = next((future for future in futures if future.product_code == 'ES' and future.active_month), None)
            for future in vx_futures:
                subs_list.append(future.streamer_symbol)
            subs_list.append(es_future.streamer_symbol)

            await streamer.subscribe(EventType.QUOTE, subs_list)
            await streamer.subscribe(EventType.CANDLE, ['VIX', 'VIX3M', 'SPX', 'VVIX'])
            self.streamer = streamer

            t_listen_candle = asyncio.create_task(self._update_candle())
            t_listen_quotes = asyncio.create_task(self._update_quotes())
            await asyncio.gather(t_listen_candle, t_listen_quotes)

    async def _update_candle(self):
        async for e in self.streamer.listen(EventType.CANDLE):
            print(f'{e.eventSymbol} {e.close} {e.time}')
            self.quotes[e.eventSymbol] = {'close': e.close, 'time': e.time}
            self.queue.put(self.quotes)

    async def _update_quotes(self):
        async for e in self.streamer.listen(EventType.QUOTE):
            print(f'{e.eventSymbol} {e.bidPrice} {e.askPrice}')
            self.quotes[e.eventSymbol] = {'close': e.askPrice, 'time': datetime.datetime.now().timestamp()}
            self.queue.put(self.quotes)```
Graeme22 commented 3 months ago

Hey! Glad the SDK has served your purposes well :)

What is the best way to keep the session open after 24 hours?

  • Do I need to poll the session to see if its still active?

Does the rememberme actually keep the session open forever?

From the tastytrade docs: "A valid remember token can be used in place of password to create a new session. Remember tokens are one-time use, meaning they are invalidated when redeemed. If not redeemed, they will expire after 28 days."

So, as far as I can tell, there's no way to keep a session open indefinitely. Your best bet is to restart/create a new session once a day before market opens.

UpsidePotential commented 3 months ago

Some quick testing this seemed to work pretty well.

import asyncio
import datetime
import os
from threading import Thread
from tastytrade import ProductionSession
from tastytrade.instruments import Future
from tastytrade import DXLinkStreamer
from tastytrade.dxfeed import EventType

from queue import Queue

def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
    asyncio.set_event_loop(loop)
    loop.run_forever()

class TastyClient:
    def __init__(self):
        self.session = ProductionSession(os.environ.get('TWUSER'), os.environ.get('TWPASS'))
        self.loop = asyncio.new_event_loop()
        self.t = Thread(target=start_background_loop, args=(self.loop,), daemon=True)
        self.t.start()
        self.quotes = {}
        self.queue = Queue(maxsize=0)

        self.start_streamer()

    def restart_streamer(self):
        if hasattr(self, 'streamer') and self.streamer:
            asyncio.run_coroutine_threadsafe(self.streamer.close(), self.loop)
        self.start_streamer()

    def start_streamer(self):
        asyncio.run_coroutine_threadsafe(self.prices(), self.loop)

    async def prices(self):
        async with DXLinkStreamer(self.session) as streamer:
            subs_list = []  # list of symbols to subscribe to
            futures = Future.get_futures(self.session, product_codes=['VX', 'ES'])
            vx_futures = [future for future in futures if future.product_code == 'VX']
            es_future = next((future for future in futures if future.product_code == 'ES' and future.active_month), None)
            for future in vx_futures:
                subs_list.append(future.streamer_symbol)
            subs_list.append(es_future.streamer_symbol)

            await streamer.subscribe(EventType.QUOTE, subs_list)
            await streamer.subscribe(EventType.CANDLE, ['VIX', 'VIX3M', 'SPX', 'VVIX'])
            self.streamer = streamer

            t_listen_candle = asyncio.create_task(self._update_candle())
            t_listen_quotes = asyncio.create_task(self._update_quotes())
            t_watch_session = asyncio.create_task(self._check_session())
            await asyncio.gather(t_listen_candle, t_listen_quotes, t_watch_session)

    async def _update_candle(self):
        async for e in self.streamer.listen(EventType.CANDLE):
            print(f'{e.eventSymbol} {e.close} {e.time}')
            self.quotes[e.eventSymbol] = {'close': e.close, 'time': e.time}
            self.queue.put(self.quotes)

    async def _update_quotes(self):
        async for e in self.streamer.listen(EventType.QUOTE):
            print(f'{e.eventSymbol} {e.bidPrice} {e.askPrice}')
            self.quotes[e.eventSymbol] = {'close': e.askPrice, 'time': datetime.datetime.now().timestamp()}
            self.queue.put(self.quotes)

    async def _check_session(self):
        while True:
            await asyncio.sleep(300)
            if not self.session.validate():
                print('session invalid, recreating and restarted streamer')
                self.session = ProductionSession(os.environ.get('TWUSER'), os.environ.get('TWPASS'))
                self.restart_streamer()