Closed Quenos closed 1 year ago
See #61, you're probably using validate_response
with incorrectly formatted custom requests.
No, no custom request. The exact same code works on production server, but not on certification.
Can you post your code please?
import concurrent.futures
import decimal
from concurrent.futures import ThreadPoolExecutor, wait
from decimal import Decimal, getcontext
from datetime import date, datetime, timedelta
from dataclasses import dataclass
from math import floor
from time import sleep
from tastytrade.account import Account
from tastytrade.instruments import get_option_chain, OptionType, Option
from tastytrade.order import OrderAction, NewOrder, OrderTimeInForce, OrderType, PriceEffect, OrderStatus
from tastytrade.session import Session
from option_data_streamer import OptionDataStreamer
from stock_data_streamer import StockDataStreamer
@dataclass
class IronFly:
symbol: str
spread: int
stock_data_streamer: StockDataStreamer = None
option_data_streamer: OptionDataStreamer = None
tp_perc: Decimal = Decimal(-1)
sl_perc: Decimal = Decimal(-1)
atm: int = 0
is_cert: bool = True
is_dry_run: bool = True
session: Session = None
account: Account = None
wing_put_leg: Option = None
wing_call_leg: Option = None
atm_put_leg: Option = None
atm_call_leg: Option = None
chain: dict[date, list] = None
order_id: str = None
def __post_init__(self):
if self.is_cert:
self.session = Session('xxxx',
"xxxxx",
is_certification=True)
else:
self.session = Session('xxxx',
"xxxxxx",
is_certification=False)
self.account = Account.get_accounts(self.session)[0]
self.build_iron_fly()
def build_iron_fly(self):
if self.symbol == 'XSP':
symbol = 'SPX'
divider = 10
else:
symbol = self.symbol
divider = 1
self.stock_data_streamer = StockDataStreamer(self.session, streamer_symbol=[symbol])
quote = self.stock_data_streamer.get_last_quote()
...
import asyncio
from dataclasses import dataclass
from tastytrade.session import Session
from tastytrade.streamer import DataStreamer, EventType
@dataclass
class StockDataStreamer:
session: Session
stock_price: list[dict] = None
streamer_symbol: list[str] = None
def __post_init__(self):
self.stock_price = []
async def gather(self):
await asyncio.gather(self.gather_streaming_data())
async def gather_streaming_data(self):
streamer = await DataStreamer.create(self.session)
# this function fetches quotes once, then closes the subscription
await streamer.subscribe(EventType.QUOTE, self.streamer_symbol)
async for quote in streamer.listen():
self.stock_price.append(quote)
break
def get_last_quote(self):
self.stock_price = []
asyncio.run(self.gather())
if self.stock_price is not None:
return self.stock_price[0]
return None
This code crashes in StockDataStreamer.gather_streaming_data on streamer = await DataStreamer.create(self.session) But only in the cert server, not on production
It looks like Tastytrade has removed the ability to create a streamer using a certification account :( I tried code that used to work with an older version that now breaks, so the API has changed.
I'll update this in the next release, for now just use the streamer with a production account only.
In utils.py the validate response raises a key error.
In line 45 it expects error['code'] and error['message'], however the correct fields would be error['domain'] and error['reason']