Closed webbuilderhero closed 11 months ago
vbt supports CCXT to fetch data, so the changes for the fetching are relatively easy. In orders/ib, there is def retrieve_data_YF, and by replacing the YFData through the CCXTData + a bit of configuration (vbt.CCXTData.set_custom_settings before), the fetch part should work.
Concerning other calls, like perform trade, it is also in orders/ib. You can create a kind of copy of class IBData(RemoteData) for CCXT. I suppose, it would be even easier as probably with Rest call, the connection to IB is really a pain in t.. Additionally, you surely don't have to worry in CCXT about which data packet you bought, and which not...
Normally 95% of the change should be in orders/ib.py. The 5% remaining concern the use_IB variable in reporting/models which should be changed to something else to route to CCXT.
Mt5 is not supported by vbt... so you would start from a white sheet. As written in the other topic, I am working on refactoring the code. It can only help, but it is only vaguely related to this topic. I don't plan to implement other things than IB presently (it is already complicated enough :D).
Ok I will work on converting your IB.py into mt5.py and post it here soon. I trade on rythmic and mt5 for props. But ccxt is just for personal stuff. I guess I can use IB but they are so expensive with everything. TradeStation I have had better luck in the past. My main goal is to get all my strats running in one web-based platform. for personal and prop. Django is my main framework right now. I can help with allot of the code base.
`import MetaTrader5 as mt5 import pandas as pd import numpy as np import logging import vectorbt as vbt
class mt5Data: def init(self, host, port): self.host = host self.port = port self.client = None
def connect(self):
mt5.initialize()
connected = mt5.terminal_info().update_type != 2
if not connected:
connected = mt5.wait_terminal(60)
if not connected:
raise ConnectionError("Failed to connect to MetaTrader 5 terminal.")
return True
def resolve_client(self, client=None, **client_config):
if client is None:
client = mt5.TerminalInfo()
client.update_type = 2
client.host = self.host
client.port = self.port
client.name = "MetaTrader 5"
client.config = client_config
return client
def get_contract_mt5(self, symbol, exchange, index):
contract = mt5.symbol_info(symbol)
if not contract.visible:
raise ValueError(f"Symbol {symbol} is not available.")
return contract
def fetch_symbol(self, symbol, client=None, client_config=None, period=None, start=None, end=None, timeframe=None,
indexes=None, exchanges=None):
if client is None:
client = self.resolve_client(**client_config)
if period is None:
period = mt5.TIMEFRAME_D1
if start is not None and end is not None:
rates = mt5.copy_rates_range(symbol, period, start, end)
else:
rates = mt5.copy_rates_from_pos(symbol, period, 0, 1000)
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.set_index('time', inplace=True)
return df
def get_last_price(self, contract):
tick = mt5.symbol_info_tick(contract.name)
return tick.bid if tick.bid != 0 else tick.last
def connect_mt5(func):
def wrapper(self, *args, **kwargs):
self.connect()
return func(self, *args, **kwargs)
return wrapper
@connect_mt5
def place(self, buy, action, short, **kwargs):
symbol = kwargs.get('symbol')
# Place market order using MT5 API
if buy:
# Place buy order
if short:
# Place short buy order
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": 0.01, # Specify the desired volume for the trade
"type": mt5.ORDER_TYPE_SELL,
"deviation": 10, # Specify the deviation value
"magic": 12345, # Specify the magic number for the order
"comment": "Short Buy Order" # Specify a comment for the order
}
else:
# Place long buy order
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": 0.01, # Specify the desired volume for the trade
"type": mt5.ORDER_TYPE_BUY,
"deviation": 10, # Specify the deviation value
"magic": 12345, # Specify the magic number for the order
"comment": "Long Buy Order" # Specify a comment for the order
}
else:
# Place sell order
if short:
# Place short sell order
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": 0.01, # Specify the desired volume for the trade
"type": mt5.ORDER_TYPE_BUY,
"deviation": 10, # Specify the deviation value
"magic": 12345, # Specify the magic number for the order
"comment": "Short Sell Order" # Specify a comment for the order
}
else:
# Place long sell order
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": 0.01, # Specify the desired volume for the trade
"type": mt5.ORDER_TYPE_SELL,
"deviation": 10, # Specify the deviation value
"magic": 12345, # Specify the magic number for the order
"comment": "Long Sell Order" # Specify a comment for the order
}
result = mt5.order_send(request)
if result.retcode != mt5.TRADE_RETCODE_DONE:
raise ValueError(f"Failed to send trade order: {result.comment}")
return result.order
def get_tradable_contract_mt5(self, action, short, **kwargs):
symbol = kwargs.get('symbol')
# Assuming the symbol and other required parameters are provided
if action == 'buy':
trade_action = mt5.TRADE_ACTION_DEAL
elif action == 'sell':
trade_action = mt5.TRADE_ACTION_DEAL
else:
raise ValueError(f"Invalid action: {action}")
if short:
trade_type = mt5.ORDER_TYPE_SELL
else:
trade_type = mt5.ORDER_TYPE_BUY
request = {
"action": trade_action,
"symbol": symbol,
"volume": 0.01, # Specify the desired volume for the trade
"type": trade_type,
"deviation": 10, # Specify the deviation value
"magic": 12345, # Specify the magic number for the order
"comment": "Trade order" # Specify a comment for the order
}
result = mt5.order_send(request)
if result.retcode != mt5.TRADE_RETCODE_DONE:
raise ValueError(f"Failed to send trade order: {result.comment}")
return result.order
def retrieve_quantity_mt5(self, in_action, **kwargs):
symbol = kwargs.get('symbol')
# Assuming the symbol and other required parameters are provided
if in_action == 'buy':
position_type = mt5.POSITION_TYPE_BUY
elif in_action == 'sell':
position_type = mt5.POSITION_TYPE_SELL
else:
raise ValueError(f"Invalid action: {in_action}")
# Get the positions for the specified symbol
positions = mt5.positions_get(symbol=symbol)
if not positions:
return 0
# Filter positions based on the specified action
filtered_positions = [p for p in positions if p.type == position_type]
# Calculate the total quantity for the filtered positions
total_quantity = sum(p.volume for p in filtered_positions)
return total_quantity
def retrieve_mt5_pf(self, **kwargs):
# Get the current positions from the MetaTrader 5 API
positions = mt5.positions_get()
long_positions = []
short_positions = []
for position in positions:
if position.volume > 0:
# Long position
long_positions.append({
'symbol': position.symbol,
'volume': position.volume,
'entry_price': position.price_open,
'current_price': position.price_current
})
elif position.volume < 0:
# Short position
short_positions.append({
'symbol': position.symbol,
'volume': abs(position.volume),
'entry_price': position.price_open,
'current_price': position.price_current
})
return long_positions, short_positions
def check_enough_cash(self, order_size, **kwargs):
# Get the account information from the MetaTrader 5 API
account_info = mt5.account_info()
# Check if the account has enough cash balance to cover the order size
if account_info.balance >= order_size:
return True
else:
return False
def cash_balance(self, currency=None, **kwargs):
# Get the account information from the MetaTrader 5 API
account_info = mt5.account_info()
# If currency is not specified, return the account balance in the account's base currency
if currency is None:
return account_info.balance
# Get the balance for the specified currency
currency_balance = None
for balance in account_info.balances:
if balance.currency == currency:
currency_balance = balance.amount
break
return currency_balance
def get_ratio(self, action, symbol, exchange=None, index=None):
# Get the current price using the mt5.symbol_info_tick() function
tick = mt5.symbol_info_tick(symbol)
if tick is None:
return None
# Calculate the ratio based on the action (Buy or Sell)
if action == 'Buy':
current_price = tick.ask
elif action == 'Sell':
current_price = tick.bid
else:
return None
# Get the reference price based on the provided exchange and index
reference_price = None
if exchange and index:
# Retrieve the reference price using the mt5.copy_rates_from() function
rates = mt5.copy_rates_from(symbol, mt5.TIMEFRAME_D1, self.start_date, self.end_date)
if rates is None or len(rates) == 0:
return None
# Find the reference price based on the index
for rate in rates:
if rate.time == index:
reference_price = rate.close
break
# Calculate the ratio as a percentage
if reference_price is not None:
ratio = (current_price / reference_price) * 100.0
return ratio
return None
def exit_order_sub(self, symbol, strategy, exchange, short, auto=True, **kwargs):
# Retrieve the current position for the symbol using mt5.positions_get()
positions = mt5.positions_get(symbol=symbol)
if positions is None or len(positions) == 0:
return False
# Find the position that matches the strategy and short status
for position in positions:
if position.comment == strategy and position.type == mt5.ORDER_TYPE_BUY if not short else mt5.ORDER_TYPE_SELL:
# Create an exit order using mt5.order_send() with the opposite action of the current position
action = 'Sell' if not short else 'Buy'
order = self.place(action, symbol, exchange, auto=auto, **kwargs)
# Check if the order placement was successful
if order is not None:
# Update the position comment to indicate the exit order
comment = f"{strategy}_Exit"
mt5.position_set_comment(position.ticket, comment)
# Return True to indicate a successful exit order placement
return True
# Return False if no matching position was found or if the exit order placement failed
return False
def entry_order_sub(self, symbol, strategy, exchange, short, auto=True, **kwargs):
# Retrieve the current position for the symbol using mt5.positions_get()
positions = mt5.positions_get(symbol=symbol)
if positions is not None and len(positions) > 0:
# Check if there is already an open position for the symbol and strategy
for position in positions:
if position.comment == strategy:
return False
# Check if there is enough cash balance to cover the order size using the `check_enough_cash` method
if not self.check_enough_cash(symbol, exchange, short, **kwargs):
return False
# Determine the action based on the short status
action = 'Buy' if not short else 'Sell'
# Place the entry order using the `place` method
order = self.place(action, symbol, exchange, auto=auto, **kwargs)
# Check if the order placement was successful
if order is not None:
# Update the position comment to indicate the entry order
comment = strategy
mt5.position_set_comment(order.ticket, comment)
# Return True to indicate a successful entry order placement
return True
# Return False if the entry order placement failed
return False
def check_hold_duration(self, symbol, strategy, exchange, short, **kwargs):
# Retrieve the current positions for the symbol using mt5.positions_get()
positions = mt5.positions_get(symbol=symbol)
if positions is not None and len(positions) > 0:
# Check if there is an open position for the symbol and strategy
for position in positions:
if position.comment == strategy:
# Calculate the holding duration in minutes
holding_duration = (mt5.datetime.datetime.now() - position.time).total_seconds() / 60
# Check if the holding duration exceeds the specified threshold
if holding_duration >= self.hold_duration_threshold:
# If the holding duration is longer than the threshold, exit the position
self.exit_order(symbol, strategy, exchange, short, **kwargs)
# Return the holding duration
return holding_duration
# Return None if there is no open position for the symbol and strategy
return None
def check_auto_manual(self, func, symbol, strategy, exchange, short, auto, **kwargs):
if auto:
# If auto mode is enabled, execute the function automatically
func(symbol, strategy, exchange, short, **kwargs)
else:
# If auto mode is disabled, prompt the user to confirm the order execution
confirm = input("Do you want to manually execute the order? (y/n): ")
if confirm.lower() == "y":
func(symbol, strategy, exchange, short, **kwargs)
else:
print("Order execution cancelled.")
def reverse_order(self, symbol, strategy, exchange, short, auto, **kwargs):
# Reverses an existing order or creates a new one for the specified symbol, strategy, exchange, and short status
positions = mt5.positions_get(symbol=symbol)
if positions is None or len(positions) == 0:
# No existing positions, place a new entry order with opposite action
action = 'Buy' if short else 'Sell'
self.place(action, symbol, exchange, auto=auto, **kwargs)
else:
# Reverse existing positions
for position in positions:
if position.comment == strategy:
action = 'Buy' if position.type == mt5.ORDER_TYPE_SELL else 'Sell'
self.place(action, symbol, exchange, auto=auto, **kwargs)
def entry_order(self, symbol, strategy, exchange, short, auto, **kwargs):
# Places a new entry order for the specified symbol, strategy, exchange, and short status
action = 'Buy' if not short else 'Sell'
self.place(action, symbol, exchange, auto=auto, **kwargs)
def exit_order(self, symbol, strategy, exchange, short, auto, **kwargs):
# Exits an existing order for the specified symbol, strategy, exchange, and short status
positions = mt5.positions_get(symbol=symbol)
if positions is not None and len(positions) > 0:
for position in positions:
if position.comment == strategy:
action = 'Sell' if position.type == mt5.ORDER_TYPE_BUY else 'Buy'
self.place(action, symbol, exchange, auto=auto, **kwargs)
def check_if_index(self, action):
# Checks if the specified action represents an index based on its category
if action == 'index':
return True
return False
def retrieve_data_mt5(self, actions, period, **kwargs):
# Retrieves historical market data for the specified actions using MT5 API
data = {}
for action in actions:
df = self.fetch_symbol(action, period=period, **kwargs)
data[action] = df
return data
def retrieve_data_YF(actions,period,**kwargs):
#add the index to the list of stocks downloaded. Useful to make calculation on the index to determine trends
#by downloading at the same time, we are sure the signals are aligned
try:
symbols=[a.symbol for a in actions]
if kwargs.get("index",False):
index_symbol=symbols[0]
all_symbols=symbols
else:
_, index_symbol=exchange_to_index_symbol(actions[0].stock_ex)
all_symbols=symbols+[index_symbol]
#res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
ok=False
first_round=True
#look for anomaly
if len(all_symbols)>2:
res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
avg=np.average(
[len(vbt.YFData.fetch(all_symbols[0], period=period,**kwargs).get('Open')),
len(vbt.YFData.fetch(all_symbols[1], period=period,**kwargs).get('Open')),
len(vbt.YFData.fetch(all_symbols[-1], period=period,**kwargs).get('Open'))]
)
if len(res.get('Open'))<avg-10:
print("Anomaly found by downloading the symbols, check that the symbol with most nan is not delisted or if its introduction date is correct")
res_nodrop=vbt.YFData.fetch(all_symbols, period=period,**kwargs)
nb_nan={}
for c in res.get('Open').columns:
nb_nan[c]=np.count_nonzero(np.isnan(res_nodrop.get('Open')[c]))
nb_nan=sorted(nb_nan.items(), key=lambda tup: tup[1],reverse=True)
print("Number of nan in each column: "+str(nb_nan))
else:
first_round=False
#test if the symbols were downloaded
while not ok and len(symbols)>=0:
if not first_round:
res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
ok=True
o=res.get('Open')
for s in symbols:
try:
o[s]
except:
logger.info("symbol not found: "+s)
ok=False
symbols.remove(s)
all_symbols.remove(s)
return res,\
symbols,\
index_symbol
except Exception as e:
print(e)
logger.error(e, stack_info=True, exc_info=True)
def retrieve_data(self, o, actions, period, use_MT5, **kwargs):
# Retrieves historical market data for the specified actions using either MT5 API or YF
if use_MT5:
return self.retrieve_data_mt5(actions, period, **kwargs)
else:
return self.retrieve_data_YF(actions, period, **kwargs)
`
here is for trade station `import requests import pandas as pd import logging import vectorbt as vbt
class TradeStationData: def init(self, api_key): self.api_key = api_key self.base_url = 'https://api.tradestation.com/v2'
def connect(self):
# Connect to TradeStation using the specified host and port
response = requests.get(f'{self.base_url}/connect', headers={'Authorization': f'Bearer {self.api_key}'})
if response.status_code == 200:
print("Connected to TradeStation")
else:
raise ConnectionError("Failed to connect to TradeStation")
def resolve_client(self, client=None, **client_config):
# Resolve the TradeStation client to use for API calls
if client is None:
# Create a new TradeStation client with the specified configuration
client = TradeStationClient(**client_config)
return client
def get_contract_ts(self, symbol, exchange, index):
# Get a TradeStation contract object for the specified symbol, exchange, and index status
response = requests.get(f'{self.base_url}/contracts/{symbol}', headers={'Authorization': f'Bearer {self.api_key}'})
if response.status_code == 200:
contract_data = response.json()
contract = TradeStationContract(contract_data['symbol'], contract_data['exchange'], index)
return contract
else:
raise ValueError(f"Failed to retrieve contract for symbol: {symbol}")
def fetch_symbol_ts(self, symbol, client=None, client_config=None, period=None, start=None, end=None, timeframe=None,
indexes=None, exchanges=None):
# Fetch historical market data for the specified symbol using TradeStation API
if client is None:
client = self.resolve_client(**client_config)
# Construct the API endpoint URL based on the specified parameters
endpoint = f"{self.base_url}/symbol/{symbol}/history"
params = {
'period': period,
'start': start,
'end': end,
'timeframe': timeframe,
'indexes': indexes,
'exchanges': exchanges
}
# Send a GET request to the TradeStation API to fetch the historical data
response = requests.get(endpoint, headers={'Authorization': f'Bearer {self.api_key}'}, params=params)
if response.status_code == 200:
data = response.json()
df = pd.DataFrame(data)
# Convert timestamp column to datetime format
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df.set_index('timestamp', inplace=True)
return df
else:
raise ValueError(f"Failed to fetch symbol data for symbol: {symbol}")
def get_last_price_ts(self, contract):
# Retrieve the last price for the specified contract using TradeStation API
endpoint = f"{self.base_url}/symbol/{contract.symbol}/quote"
response = requests.get(endpoint, headers={'Authorization': f'Bearer {self.api_key}'})
if response.status_code == 200:
quote_data = response.json()
last_price = quote_data['lastPrice']
return last_price
else:
raise ValueError(f"Failed to retrieve last price for contract: {contract}")
def connect_ts(func):
# Decorator for connecting to TradeStation before executing a function
# Implementation specific to TradeStation API connection
# Example implementation:
def wrapper(self, *args, **kwargs):
self.connect()
return func(self, *args, **kwargs)
return wrapper
@connect_ts
def place_ts(self, buy, action, short, **kwargs):
# Place a market order (buy/sell) for the specified action and short status using TradeStation API
endpoint = f"{self.base_url}/orders"
payload = {
'action': action,
'symbol': kwargs['symbol'],
'quantity': kwargs['quantity'],
'orderType': 'market',
'timeInForce': 'day'
}
response = requests.post(endpoint, headers={'Authorization': f'Bearer {self.api_key}'}, json=payload)
if response.status_code == 200:
order_data = response.json()
order_id = order_data['orderId']
return order_id
else:
raise ValueError("Failed to place order")
def get_tradable_contract_ts(self, action, short, **kwargs):
# Retrieve a tradable contract for the specified action and short status using TradeStation API
symbol = kwargs['symbol']
exchange = kwargs['exchange']
index = kwargs['index']
# Get the contract details using the provided parameters
contract = self.get_contract_ts(symbol, exchange, index)
return contract
def retrieve_quantity_ts(self, in_action, **kwargs):
# Retrieve the quantity and sign of the specified action using TradeStation API
symbol = kwargs['symbol']
contract = self.get_contract_ts(symbol, None, None)
last_price = self.get_last_price_ts(contract)
if in_action == 'buy':
sign = 1
elif in_action == 'sell':
sign = -1
else:
raise ValueError(f"Invalid action: {in_action}")
quantity = sign * kwargs['quantity']
return quantity
def retrieve_ts_pf(self, **kwargs):
# Retrieve the current positions (portfolio) from TradeStation API and return a list of long and short positions
endpoint = f"{self.base_url}/account"
response = requests.get(endpoint, headers={'Authorization': f'Bearer {self.api_key}'})
if response.status_code == 200:
account_data = response.json()
positions = account_data['positions']
long_positions = []
short_positions = []
for position in positions:
symbol = position['symbol']
quantity = position['quantity']
entry_price = position['averageEntryPrice']
current_price = position['currentPrice']
if quantity > 0:
long_positions.append({
'symbol': symbol,
'quantity': quantity,
'entry_price': entry_price,
'current_price': current_price
})
elif quantity < 0:
short_positions.append({
'symbol': symbol,
'quantity': abs(quantity),
'entry_price': entry_price,
'current_price': current_price
})
return long_positions, short_positions
else:
raise ValueError("Failed to retrieve portfolio positions")
def check_enough_cash_ts(self, order_size, **kwargs):
# Check if there is enough cash balance to cover the specified order size using TradeStation API
cash_balance = self.cash_balance_ts(**kwargs)
return cash_balance >= order_size
def cash_balance_ts(self, currency=None, **kwargs):
# Retrieve the cash balance for the specified currency using TradeStation API
endpoint = f"{self.base_url}/account"
response = requests.get(endpoint, headers={'Authorization': f'Bearer {self.api_key}'})
if response.status_code == 200:
account_data = response.json()
cash_balance = account_data['cashBalance']
return cash_balance
else:
raise ValueError("Failed to retrieve cash balance")
def get_ratio_ts(self, action, symbol, exchange=None, index=None):
# Retrieve the ratio between the current price and the reference price for the specified action using TradeStation API or YF
if exchange is None:
exchange = ''
if index is None:
contract = self.get_contract_ts(symbol, exchange, index)
last_price = self.get_last_price_ts(contract)
else:
# Retrieve the historical data using Yahoo Finance
data = self.retrieve_data_YF([symbol], period='1d')
last_price = data[symbol].iloc[-1]
reference_price = kwargs.get('reference_price', None)
if reference_price is not None:
ratio = last_price / reference_price * 100
else:
ratio = last_price
return ratio
def exit_order_sub_ts(self, symbol, strategy, exchange, short, auto=True, **kwargs):
# Subroutine for exiting an order using TradeStation API
if auto:
# Retrieve the position and quantity information
positions = self.retrieve_ts_pf()
for position in positions:
if position['symbol'] == symbol:
quantity = position['quantity']
break
if quantity == 0:
return
# Place the exit order
self.place_ts(False, 'sell', short, symbol=symbol, quantity=abs(quantity))
def entry_order_sub_ts(self, symbol, strategy, exchange, short, auto=True, **kwargs):
# Subroutine for placing an entry order using TradeStation API
if auto:
# Retrieve the quantity information
quantity = kwargs.get('quantity', None)
if quantity is None:
quantity = self.retrieve_quantity_ts('buy', symbol=symbol, quantity=1)
# Place the entry order
self.place_ts(True, 'buy', short, symbol=symbol, quantity=quantity)
def check_hold_duration_ts(self, symbol, strategy, exchange, short, **kwargs):
# Check the holding duration of the specified symbol in the portfolio using TradeStation API
positions = self.retrieve_ts_pf()
for position in positions:
if position['symbol'] == symbol:
entry_price = position['entry_price']
current_price = position['current_price']
holding_duration = kwargs.get('holding_duration', None)
if holding_duration is not None:
if current_price > entry_price and holding_duration <= 0:
return True
elif current_price <= entry_price and holding_duration > 0:
return True
else:
return False
return False
def check_auto_manual_ts(self, func, symbol, strategy, exchange, short, auto, **kwargs):
# Check if the order execution should be performed automatically or manually based on the specified conditions
if auto:
return True
else:
return False
def reverse_order_ts(self, symbol, strategy, exchange, short, auto, **kwargs):
# Reverse an existing order or create a new one for the specified symbol, strategy, exchange, and short status using TradeStation API
if auto:
# Check if the order should be reversed
if self.check_hold_duration_ts(symbol, strategy, exchange, short, **kwargs):
self.exit_order_sub_ts(symbol, strategy, exchange, short, auto=True, **kwargs)
self.entry_order_sub_ts(symbol, strategy, exchange, short, auto=True, **kwargs)
else:
self.exit_order_sub_ts(symbol, strategy, exchange, short, auto=True, **kwargs)
else:
self.exit_order_sub_ts(symbol, strategy, exchange, short, auto=False, **kwargs)
self.entry_order_sub_ts(symbol, strategy, exchange, short, auto=False, **kwargs)
def entry_order_ts(self, symbol, strategy, exchange, short, auto, **kwargs):
# Place a new entry order for the specified symbol, strategy, exchange, and short status using TradeStation API
if auto:
self.entry_order_sub_ts(symbol, strategy, exchange, short, auto=True, **kwargs)
else:
self.entry_order_sub_ts(symbol, strategy, exchange, short, auto=False, **kwargs)
def exit_order_ts(self, symbol, strategy, exchange, short, auto, **kwargs):
# Exit an existing order for the specified symbol, strategy, exchange, and short status using TradeStation API
if auto:
self.exit_order_sub_ts(symbol, strategy, exchange, short, auto=True, **kwargs)
else:
self.exit_order_sub_ts(symbol, strategy, exchange, short, auto=False, **kwargs)
def check_if_index_ts(self, action):
# Check if the specified action represents an index based on its category
if action == 'index':
return True
else:
return False
def retrieve_data_ts(self, actions, period, **kwargs):
# Retrieve historical market data for the specified actions using TradeStation API
data = pd.DataFrame()
for action in actions:
symbol = action['symbol']
timeframe = action['timeframe']
start = action['start']
end = action['end']
df = self.fetch_symbol_ts(symbol, period=period, start=start, end=end, timeframe=timeframe)
data[symbol] = df['close']
return data
@staticmethod
def retrieve_data_YF(actions, period, **kwargs):
# Retrieve historical market data for the specified actions using Yahoo Finance (YF)
# Implementation specific to Yahoo Finance
# Example implementation:
symbols = [action['symbol'] for action in actions]
data = vbt.YFData.download(symbols, period=period)
return data['Close']
def retrieve_data(self, o, actions, period, use_TS, **kwargs):
# Retrieve historical market data for the specified actions using either TradeStation API or YF
if use_TS:
data = self.retrieve_data_ts(actions, period, **kwargs)
o.set_data(data)
return True
else:
data = self.retrieve_data_YF(actions, period, **kwargs)
o.set_data(data)
return False
trade_station = TradeStationData('YOUR_TRADESTATION_API_KEY') `
here is CCXT `import ccxt import pandas as pd import numpy as np import logging import vectorbt as vbt
class CCXTData: def init(self, exchange_name, api_key=None, api_secret=None): self.exchange_name = exchange_name self.api_key = api_key self.api_secret = api_secret self.exchange = None
def connect(self):
self.exchange = getattr(ccxt, self.exchange_name)({
'apiKey': self.api_key,
'secret': self.api_secret
})
def fetch_symbol(self, symbol, timeframe='1d', limit=1000):
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
def place_order(self, symbol, side, type_, price=None, quantity=None):
order_params = {
'symbol': symbol,
'side': side,
'type': type_,
'price': price,
'quantity': quantity
}
order = self.exchange.create_order(**order_params)
return order
def connect_ccxt(func):
def wrapper(self, *args, **kwargs):
self.connect()
return func(self, *args, **kwargs)
return wrapper
@connect_ccxt
def get_tradable_contract_ccxt(self, action, short, **kwargs):
symbol = kwargs.get('symbol')
if action == 'buy':
trade_type = 'buy'
elif action == 'sell':
trade_type = 'sell'
else:
raise ValueError(f"Invalid action: {action}")
if short:
trade_side = 'sell'
else:
trade_side = 'buy'
order_params = {
'symbol': symbol,
'side': trade_side,
'type': trade_type
}
order = self.exchange.create_order(**order_params)
return order
@connect_ccxt
def retrieve_quantity_ccxt(self, in_action, **kwargs):
symbol = kwargs.get('symbol')
if in_action == 'buy':
trade_side = 'buy'
elif in_action == 'sell':
trade_side = 'sell'
else:
raise ValueError(f"Invalid action: {in_action}")
positions = self.exchange.fetch_positions()
if not positions:
return 0
filtered_positions = [p for p in positions if p['side'] == trade_side]
total_quantity = sum(p['quantity'] for p in filtered_positions)
return total_quantity
def reverse_order_ccxt(self, symbol, strategy, exchange, short, auto, **kwargs):
positions = self.retrieve_ccxt_pf()
if positions is None or len(positions) == 0:
action = 'buy' if short else 'sell'
self.place_order(symbol, action, 'market', quantity=kwargs.get('quantity'))
else:
for position in positions:
if position['symbol'] == symbol and position['strategy'] == strategy:
action = 'buy' if position['side'] == 'sell' else 'sell'
self.place_order(symbol, action, 'market', quantity=position['quantity'])
def exit_order_ccxt(self, symbol, strategy, exchange, short, auto, **kwargs):
positions = self.retrieve_ccxt_pf()
if positions is not None and len(positions) > 0:
for position in positions:
if position['symbol'] == symbol and position['strategy'] == strategy:
action = 'sell' if position['side'] == 'buy' else 'buy'
self.place_order(symbol, action, 'market', quantity=position['quantity'])
def check_if_index(self, action):
if action == 'index':
return True
return False
def retrieve_data_ccxt(self, actions, timeframe='1d', limit=1000):
data = {}
for action in actions:
df = self.fetch_symbol(action, timeframe=timeframe, limit=limit)
data[action] = df
return data
def retrieve_data_YF(self, actions, timeframe='1d', limit=1000):
try:
symbols = [a['symbol'] for a in actions]
if kwargs.get("index", False):
index_symbol = symbols[0]
all_symbols = symbols
else:
_, index_symbol = exchange_to_index_symbol(actions[0]['exchange'])
all_symbols = symbols + [index_symbol]
res = vbt.YFData.fetch(all_symbols, period=timeframe, limit=limit, missing_index='drop')
return res, symbols, index_symbol
except Exception as e:
print(e)
def retrieve_data(self, o, actions, timeframe='1d', limit=1000, use_ccxt=True, **kwargs):
if use_ccxt:
return self.retrieve_data_ccxt(actions, timeframe, limit)
else:
return self.retrieve_data_YF(actions, timeframe, limit)
# Missing helper functions:
def connect_ccxt(func):
def wrapper(self, *args, **kwargs):
self.connect()
return func(self, *args, **kwargs)
return wrapper
def retrieve_ccxt_pf(self, **kwargs):
# Get the current positions from the CCXT API
positions = self.exchange.fetch_positions()
long_positions = []
short_positions = []
for position in positions:
if position['quantity'] > 0:
# Long position
long_positions.append({
'symbol': position['symbol'],
'quantity': position['quantity'],
'entry_price': position['entry_price'],
'current_price': position['current_price']
})
elif position['quantity'] < 0:
# Short position
short_positions.append({
'symbol': position['symbol'],
'quantity': abs(position['quantity']),
'entry_price': position['entry_price'],
'current_price': position['current_price']
})
return long_positions, short_positions
def check_enough_cash(self, order_size, **kwargs):
# Get the account information from the CCXT API
account_info = self.exchange.fetch_balance()
# Check if the account has enough cash balance to cover the order size
if account_info['free']['USD'] >= order_size:
return True
else:
return False
def cash_balance(self, currency=None, **kwargs):
# Get the account information from the CCXT API
account_info = self.exchange.fetch_balance()
# If currency is not specified, return the account balance in the account's base currency
if currency is None:
return account_info['total'][account_info['base']]
# Get the balance for the specified currency
currency_balance = account_info['total'][currency]
return currency_balance
def get_ratio(self, action, symbol, **kwargs):
# Get the current price using the CCXT API
ticker = self.exchange.fetch_ticker(symbol)
if ticker is None:
return None
# Calculate the ratio based on the action (Buy or Sell)
if action == 'buy':
current_price = ticker['ask']
elif action == 'sell':
current_price = ticker['bid']
else:
return None
# Get the reference price based on the provided data source
reference_price = None
if kwargs.get('use_IB'):
# Retrieve the reference price using the IBData class or other data source
reference_price = self.get_reference_price_from_IB(symbol, kwargs.get('exchange'), kwargs.get('index'))
# Calculate the ratio as a percentage
if reference_price is not None:
ratio = (current_price / reference_price) * 100.0
return ratio
return None
def place(self, buy, action, short, **kwargs):
# Get the order parameters
symbol = kwargs.get('symbol')
order_type = 'market' # You can customize this based on your needs
quantity = kwargs.get('quantity')
# Determine the side of the order
side = 'buy' if buy else 'sell'
if short:
side = 'sell' if buy else 'buy'
# Place the order using the CCXT API
order_params = {
'symbol': symbol,
'side': side,
'type': order_type,
'quantity': quantity
}
order = self.exchange.create_order(**order_params)
return order
def reverse_order_sub(self, symbol, strategy, exchange, short, use_CCXT, **kwargs):
if use_CCXT:
# Place a new order with opposite action and quantity based on the existing position
positions = self.retrieve_ccxt_pf()
if positions is None or len(positions) == 0:
# No existing positions, nothing to reverse
return
# Find the position to reverse
position_to_reverse = None
for position in positions:
if position['symbol'] == symbol and position['strategy'] == strategy:
position_to_reverse = position
break
if position_to_reverse is None:
# No matching position found, nothing to reverse
return
# Determine the action and quantity for the reverse order
reverse_action = 'sell' if position_to_reverse['side'] == 'buy' else 'buy'
reverse_quantity = position_to_reverse['quantity']
# Place the reverse order using the CCXT API
order_params = {
'symbol': symbol,
'side': reverse_action,
'type': 'market',
'quantity': reverse_quantity
}
order = self.exchange.create_order(**order_params)
return order
else:
# TODO: Implement logic for reversing an existing order using a different data source
pass
def exit_order_sub(self, symbol, strategy, exchange, short, use_CCXT, **kwargs):
if use_CCXT:
# Place a new order to exit the existing position
positions = self.retrieve_ccxt_pf()
if positions is None or len(positions) == 0:
# No existing positions, nothing to exit
return
# Find the position to exit
position_to_exit = None
for position in positions:
if position['symbol'] == symbol and position['strategy'] == strategy:
position_to_exit = position
break
if position_to_exit is None:
# No matching position found, nothing to exit
return
# Determine the action and quantity for the exit order
exit_action = 'sell' if position_to_exit['side'] == 'buy' else 'buy'
exit_quantity = position_to_exit['quantity']
# Place the exit order using the CCXT API
order_params = {
'symbol': symbol,
'side': exit_action,
'type': 'market',
'quantity': exit_quantity
}
order = self.exchange.create_order(**order_params)
return order
else:
# TODO: Implement logic for exiting an existing order using a different data source
pass
def entry_order_sub(self, symbol, strategy, exchange, short, use_CCXT, **kwargs):
if use_CCXT:
# Place a new entry order based on the specified symbol, strategy, exchange, and short status
action = 'buy' if not short else 'sell'
quantity = kwargs.get('quantity')
# Place the entry order using the CCXT API
order_params = {
'symbol': symbol,
'side': action,
'type': 'market',
'quantity': quantity
}
order = self.exchange.create_order(**order_params)
return order
else:
# TODO: Implement logic for placing an entry order using a different data source
pass
def check_hold_duration(self, symbol, strategy, exchange, short, **kwargs):
# Get the current positions
long_positions, short_positions = self.retrieve_ccxt_pf()
# Iterate over the positions to find the specified symbol and strategy
positions = long_positions + short_positions
for position in positions:
if position['symbol'] == symbol and position['strategy'] == strategy:
# Calculate the hold duration
entry_timestamp = position['entry_timestamp']
current_timestamp = pd.Timestamp.now()
hold_duration = current_timestamp - entry_timestamp
# Print or return the hold duration
print(f"Hold duration for {symbol} ({strategy}): {hold_duration}")
return hold_duration
# If no matching position is found, return None or raise an exception
return None
def check_auto_manual(self, func, symbol, strategy, exchange, short, auto, **kwargs):
# Perform checks based on auto/manual mode
if auto:
# Execute the function automatically
func(symbol, strategy, exchange, short, **kwargs)
else:
# Manual mode, prompt user for action
choice = input("Enter 'A' to execute automatically or 'M' to perform manual action: ")
if choice.upper() == 'A':
# Execute the function automatically
func(symbol, strategy, exchange, short, **kwargs)
elif choice.upper() == 'M':
# Perform manual action
# TODO: Implement logic for manual action
pass
else:
print("Invalid choice. Please try again.")
def reverse_order(self, symbol, strategy, exchange, short, auto, **kwargs):
if auto:
self.reverse_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
else:
# Manual mode, prompt user for action
choice = input("Enter 'A' to execute automatically or 'M' to perform manual action: ")
if choice.upper() == 'A':
self.reverse_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
elif choice.upper() == 'M':
# Perform manual action
# TODO: Implement logic for manual action
pass
else:
print("Invalid choice. Please try again.")
def entry_order(self, symbol, strategy, exchange, short, auto, **kwargs):
if auto:
self.entry_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
else:
# Manual mode, prompt user for action
choice = input("Enter 'A' to execute automatically or 'M' to perform manual action: ")
if choice.upper() == 'A':
self.entry_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
elif choice.upper() == 'M':
# Perform manual action
# TODO: Implement your manual action logic here
print("Performing manual action for entry order")
else:
print("Invalid choice. Please try again.")
def exit_order(self, symbol, strategy, exchange, short, auto, **kwargs):
if auto:
self.exit_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
else:
# Manual mode, prompt user for action
choice = input("Enter 'A' to execute automatically or 'M' to perform manual action: ")
if choice.upper() == 'A':
self.exit_order_sub(symbol, strategy, exchange, short, use_CCXT=True, **kwargs)
elif choice.upper() == 'M':
# Perform manual action
# TODO: Implement your manual action logic here
print("Performing manual action for exit order")
else:
print("Invalid choice. Please try again.")
def check_if_index(self, action):
# TODO: Implement this method
if action == 'index':
return True
return False
def retrieve_data_ccxt(self, actions, period, **kwargs):
data = {}
for action in actions:
df = self.fetch_symbol(action, timeframe=period, **kwargs)
data[action] = df
return data
def retrieve_data_YF(actions,period,**kwargs):
#add the index to the list of stocks downloaded. Useful to make calculation on the index to determine trends
#by downloading at the same time, we are sure the signals are aligned
try:
symbols=[a.symbol for a in actions]
if kwargs.get("index",False):
index_symbol=symbols[0]
all_symbols=symbols
else:
_, index_symbol=exchange_to_index_symbol(actions[0].stock_ex)
all_symbols=symbols+[index_symbol]
#res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
ok=False
first_round=True
#look for anomaly
if len(all_symbols)>2:
res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
avg=np.average(
[len(vbt.YFData.fetch(all_symbols[0], period=period,**kwargs).get('Open')),
len(vbt.YFData.fetch(all_symbols[1], period=period,**kwargs).get('Open')),
len(vbt.YFData.fetch(all_symbols[-1], period=period,**kwargs).get('Open'))]
)
if len(res.get('Open'))<avg-10:
print("Anomaly found by downloading the symbols, check that the symbol with most nan is not delisted or if its introduction date is correct")
res_nodrop=vbt.YFData.fetch(all_symbols, period=period,**kwargs)
nb_nan={}
for c in res.get('Open').columns:
nb_nan[c]=np.count_nonzero(np.isnan(res_nodrop.get('Open')[c]))
nb_nan=sorted(nb_nan.items(), key=lambda tup: tup[1],reverse=True)
print("Number of nan in each column: "+str(nb_nan))
else:
first_round=False
#test if the symbols were downloaded
while not ok and len(symbols)>=0:
if not first_round:
res=vbt.YFData.fetch(all_symbols, period=period,missing_index='drop',**kwargs)
ok=True
o=res.get('Open')
for s in symbols:
try:
o[s]
except:
logger.info("symbol not found: "+s)
ok=False
symbols.remove(s)
all_symbols.remove(s)
return res,\
symbols,\
index_symbol
except Exception as e:
print(e)
logger.error(e, stack_info=True, exc_info=True)
def retrieve_data(self, o, actions, period, use_CCXT, **kwargs):
if use_CCXT:
return self.retrieve_data_ccxt(actions, period, **kwargs)
else:
return self.retrieve_data_YF(actions, period, **kwargs)
`
now that I did all the heavy lifting should be easy :) to add do you think we can add these?
Sure, I will have a look and put it in my dev branch (which in the middle of works)
I pushed the first part of the changes in my dev branch. I have however no way to test. I am rewritting the entry/exit mechanism presently (actually it is already written, but I need to test). I will put the mt5 changes in the final version. Don't want to do the work twice.
I can test it let me know when its ready. Ill make a copy of the repo and debug it. Please note I have 3 different connection scripts there ccxt, mt5 and trade station I can test them all.
Ok, I make as fast as I can, but I don't want to commit code that does not work. I hope this week.
So I start with sorting out this whole code. It won't be done in one hour, I don't want to have the same code with 5 duplicates. Besides, I want to respect vbt.RemoteData structure. Where do you use the
trade_station = TradeStationData('YOUR_TRADESTATION_API_KEY')
?
Besides CCXTData exists already in vbt. Let's not reinvent the wheel.
Those are just prototypes I was waiting to see what you wanted to do with it. Once that's done I can refactor it all and fixed all the shared code no problem. There is a trade station package to install. The key can probably make a constant in settings.py or I can make a model and admin.py to put in all the broker info not a big deal . I did those blind in one morning so will have to work out bugs but that's not a issue I can take care of debugging it. Basically it's a primer to give to you to see what needs to be done in integration. I can get to it next week got two projects I'm working on right now.
Those are just prototypes I was waiting to see what you wanted to do with it. Once that's done I can refactor it all and fixed all the shared code no problem. There is a trade station package to install. The key can probably make a constant in settings.py or I can make a model and admin.py to put in all the broker info not a big deal . I did those blind in one morning so will have to work out bugs but that's not a issue I can take care of debugging it. Basically it's a primer to give to you to see what needs to be done in integration. I can get to it next week got two projects I'm working on right now.
First, many thx for the hard work and the proposal! I pushed my present state, still lot of work.
a) It is bad luck that you proposed your changes when I am refactoring the whole code. It makes some work double. b) My "it is easy, just..." was very optimistic. The bot was designed to handle action with IB or without IB. Not to dispatch actions between different APIs. c) The bot uses the word "Action" in a misleading way. It is actually the French word for Stock, and I forgot that it makes no sense at all in English. I need to change that, but I fear to cause 1000 bugs and lose my DB... d) IB is special. It connects with a socket connection, no REST api. It means that if I try to connect and the bot is already connected, it returns an error. Therefore there is this whole @connect_ib and so on. We need to keep in memory somewhere what is the present state. It is complicated. Additionnaly, IB has the concept of contract. Let's compare with YF. By YF, the ticker is "MC.PA". MC is the stock, PA is the exchange. In IB, we need to define each separately + the currency. Afterwards, IB populates this contract and check that is unique. Other brokers don't have this kind of thing. Object should work perfectly.
Because of c) and d), I think you were mislead. There is not need for a decorator (I think) for REST Api, we just make requests completely indepently the one from the others, that's it. And there is no contract.
Some remarks: 1) TradeStationClient is not imported. Where is it? TradeStationData('YOUR_TRADESTATION_API_KEY') no idea... 2) By CCXT, I looked in the doc, there is no fetch_position but fetchPosition. Same with fetchBalance (need to fix that). 3) I still need to complete actualize_ss() for the different API. For that I need to be able to make a for in the positions. 4) I did not see the utility to filter on retrieve_quantity depending on the direction, maybe I missed something. 5) In all API we use "symbol". Presently it is the YF ticker. Most probably it won't work. We will need a function ccxt_ticker() (how to derive the ccxt ticker from YF ticker) or save them separately in the DB, a bit like for IB. 6) Fetch_symbol requires the columns to be "Date", "Open"... I am not convinced that it is the case right now for the new APIs.
Besides, I wanted to put my get_last_price_ib and so on in IBData, but I had some difficulties. I will try later again.
No worries man im going to be busy for a little bit anyway. But this is definitely on my list of stuff to get done I have a project that needs to run algos in Django to sell signals as a service. Trade station doesn't have a socket connection but both ccxt and mt5 do have it. so maybe I can start with mt5 first since the signals are forex then move to ccxt. When do you think you will have the refactoring done? Hopefully, it will have the strats more abstracted because there seems to be allot of code in there that repeats itself for different things. Maybe you can get the backtest stuff to extend a base class. Not sure whats going on with the blank data maybe you did that to defeat bias. Like I said I did that in a morning I can probably get them all in and working in a couple of days. Would be nice though if the broker was not tightly coupled to the app. But instead have a core abstraction I can make an interface for.
here is a sample of getting data from TS api using sockets so maybe can add that also
import socket
import json
# Define the TradeStation API endpoint.
TS_API_ENDPOINT = "api.tradestation.com"
# Define the TradeStation API request.
TS_API_REQUEST = {
"requests": [
{
"service": "QUOTE",
"requestid": "1",
"command": "SUBS",
"symbol": "@CL"
},
{
"service": "QUOTE",
"requestid": "2",
"command": "SUBS",
"symbol": "AMZN"
},
{
"service": "BAR",
"requestid": "3",
"command": "SUBS",
"symbol": "AMZN",
"interval": 5,
"unit": "Minute",
"startdate": "02-25-2020",
"session": "USEQPreAndPost"
}
]
}
# Create a socket connection to the TradeStation API endpoint.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((TS_API_ENDPOINT, 443))
sock = ssl.wrap_socket(sock)
# Send the TradeStation API request to the socket.
sock.sendall(json.dumps(TS_API_REQUEST).encode())
# Receive the TradeStation API response from the socket.
while True:
response = sock.recv(1024)
if not response:
break
# Print the TradeStation API response.
print(response.decode())
What do you mean with "Not sure whats going on with the blank data maybe you did that to defeat bias." ? Concerning the refactoring, I now have the strat and presel as object. I also removed the complex "Pf and ocap". Only point really remaining (should be fast) is to really use the strat and presel objects in models.py and avoid code there...
No so sure to understand the code for TradingStation. Feel free to put that directly in the code. My changes will be somewhere else, no risk of collision.
Can we add in mt5 python api and ccxt also. im going to work on this and help if we can. I just need more direction for it. Is the only thing affected with IB the IB.py?