Open nickvertucci opened 3 years ago
I've resolved the historic agg error but updating the variables for time space and from..to but now receive this error when running the script:
def get_1000m_history_data(symbols): print('Getting historical data...') minute_history = {} c = 0 for symbol in symbols: minute_history[symbol] = api.polygon.historic_agg_v2( timespan="minute", symbol=symbol, limit=1000, multiplier=1, _from='2021-01-01', to='2021-01-26' ).df c += 1 print('{}/{}'.format(c, len(symbols))) print('Success.') return minute_history
Getting current ticker data... Success. Tracking 105 symbols. Getting historical data... 1/105 2/105 3/105 4/105 5/105 6/105 7/105 8/105 9/105 10/105 11/105 12/105 13/105 ...... 100/105 101/105 102/105 103/105 104/105 105/105 Success. Watching 105 symbols. ERROR:root:error while consuming ws messages: unknown channel A.UONE (you may need to specify the right data_stream )
Hi, what is the code you are using for this?
`import alpaca_trade_api as tradeapi from alpaca_trade_api.stream2 import StreamConn import requests import time from ta.trend import macd import numpy as np from datetime import datetime, timedelta from pytz import timezone import asyncio import websocket
base_url = 'https://paper-api.alpaca.markets' api_key = 'removed' api_secret = 'removed'
api = tradeapi.REST(api_key, api_secret, base_url, api_version='v2')
session = requests.session()
min_share_price = 2.0 max_share_price = 13.0
min_last_dv = 500000
default_stop = .95
risk = 0.001
def get_1000m_history_data(symbols): print('Getting historical data...') minute_history = {} c = 0 for symbol in symbols: minute_history[symbol] = api.polygon.historic_agg_v2( timespan="minute", symbol=symbol, limit=1000, _from='2019-01-01', to='2021-01-26', multiplier=1 ).df c += 1 print('{}/{}'.format(c, len(symbols))) print('Success.') return minute_history
def get_tickers(): print('Getting current ticker data...') tickers = api.polygon.all_tickers() print('Success.') assets = api.list_assets() symbols = [asset.symbol for asset in assets if asset.tradable] return [ticker for ticker in tickers if ( ticker.ticker in symbols and ticker.lastTrade['p'] >= min_share_price and ticker.lastTrade['p'] <= max_share_price and ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and ticker.todaysChangePerc >= 3.5 )]
def find_stop(current_value, minute_history, now): series = minute_history['low'][-100:] \ .dropna().resample('5min').min() series = series[now.floor('1D'):] diff = np.diff(series.values) low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1 if len(low_index) > 0: return series[low_index[-1]] - 0.01 return current_value * default_stop
def run(tickers, market_open_dt, market_close_dt):
conn = tradeapi.stream2.StreamConn(api_key, api_secret, base_url)
# Update initial state with information from tickers
volume_today = {}
prev_closes = {}
for ticker in tickers:
symbol = ticker.ticker
prev_closes[symbol] = ticker.prevDay['c']
volume_today[symbol] = ticker.day['v']
symbols = [ticker.ticker for ticker in tickers]
print('Tracking {} symbols.'.format(len(symbols)))
minute_history = get_1000m_history_data(symbols)
portfolio_value = float(api.get_account().portfolio_value)
open_orders = {}
positions = {}
# Cancel any existing open orders on watched symbols
existing_orders = api.list_orders(limit=500)
for order in existing_orders:
if order.symbol in symbols:
api.cancel_order(order.id)
stop_prices = {}
latest_cost_basis = {}
# Track any positions bought during previous executions
existing_positions = api.list_positions()
for position in existing_positions:
if position.symbol in symbols:
positions[position.symbol] = float(position.qty)
# Recalculate cost basis and stop price
latest_cost_basis[position.symbol] = float(position.cost_basis)
stop_prices[position.symbol] = (
float(position.cost_basis) * default_stop
)
# Keep track of what we're buying/selling
target_prices = {}
partial_fills = {}
# Use trade updates to keep track of our portfolio
@conn.on(r'trade_update')
async def handle_trade_update(conn, channel, data):
symbol = data.order['symbol']
last_order = open_orders.get(symbol)
if last_order is not None:
event = data.event
if event == 'partial_fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = qty
positions[symbol] += qty
open_orders[symbol] = data.order
elif event == 'fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = 0
positions[symbol] += qty
open_orders[symbol] = None
elif event == 'canceled' or event == 'rejected':
partial_fills[symbol] = 0
open_orders[symbol] = None
@conn.on(r'A$')
async def handle_second_bar(conn, channel, data):
symbol = data.symbol
# First, aggregate 1s bars for up-to-date MACD calculations
ts = data.start
ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond)
try:
current = minute_history[data.symbol].loc[ts]
except KeyError:
current = None
new_data = []
if current is None:
new_data = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
else:
new_data = [
current.open,
data.high if data.high > current.high else current.high,
data.low if data.low < current.low else current.low,
data.close,
current.volume + data.volume
]
minute_history[symbol].loc[ts] = new_data
# Next, check for existing orders for the stock
existing_order = open_orders.get(symbol)
if existing_order is not None:
# Make sure the order's not too old
submission_ts = existing_order.submitted_at.astimezone(
timezone('America/New_York')
)
order_lifetime = ts - submission_ts
if order_lifetime.seconds // 60 > 1:
# Cancel it so we can try again for a fill
api.cancel_order(existing_order.id)
return
# Now we check to see if it might be time to buy or sell
since_market_open = ts - market_open_dt
until_market_close = market_close_dt - ts
if (
since_market_open.seconds // 60 > 15 and
since_market_open.seconds // 60 < 60
):
# Check for buy signals
# See if we've already bought in first
position = positions.get(symbol, 0)
if position > 0:
return
# See how high the price went during the first 15 minutes
lbound = market_open_dt
ubound = lbound + timedelta(minutes=15)
high_15m = 0
try:
high_15m = minute_history[symbol][lbound:ubound]['high'].max()
except Exception as e:
# Because we're aggregating on the fly, sometimes the datetime
# index can get messy until it's healed by the minute bars
return
# Get the change since yesterday's market close
daily_pct_change = (
(data.close - prev_closes[symbol]) / prev_closes[symbol]
)
if (
daily_pct_change > .04 and
data.close > high_15m and
volume_today[symbol] > 30000
):
# check for a positive, increasing MACD
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=12,
n_slow=26
)
if (
hist[-1] < 0 or
not (hist[-3] < hist[-2] < hist[-1])
):
return
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=40,
n_slow=60
)
if hist[-1] < 0 or np.diff(hist)[-1] < 0:
return
# Stock has passed all checks; figure out how much to buy
stop_price = find_stop(
data.close, minute_history[symbol], ts
)
stop_prices[symbol] = stop_price
target_prices[symbol] = data.close + (
(data.close - stop_price) * 3
)
shares_to_buy = portfolio_value * risk // (
data.close - stop_price
)
if shares_to_buy == 0:
shares_to_buy = 1
shares_to_buy -= positions.get(symbol, 0)
if shares_to_buy <= 0:
return
print('Submitting buy for {} shares of {} at {}'.format(
shares_to_buy, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(shares_to_buy), side='buy',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
if(
since_market_open.seconds // 60 >= 24 and
until_market_close.seconds // 60 > 15
):
# Check for liquidation signals
# We can't liquidate if there's no position
position = positions.get(symbol, 0)
if position == 0:
return
# Sell for a loss if it's fallen below our stop price
# Sell for a loss if it's below our cost basis and MACD < 0
# Sell for a profit if it's above our target price
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=13,
n_slow=21
)
if (
data.close <= stop_prices[symbol] or
(data.close >= target_prices[symbol] and hist[-1] <= 0) or
(data.close <= latest_cost_basis[symbol] and hist[-1] <= 0)
):
print('Submitting sell for {} shares of {} at {}'.format(
position, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(position), side='sell',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
elif (
until_market_close.seconds // 60 <= 15
):
# Liquidate remaining positions on watched symbols at market
try:
position = api.get_position(symbol)
except Exception as e:
# Exception here indicates that we have no position
return
print('Trading over, liquidating remaining position in {}'.format(
symbol)
)
api.submit_order(
symbol=symbol, qty=position.qty, side='sell',
type='market', time_in_force='day'
)
symbols.remove(symbol)
if len(symbols) <= 0:
conn.close()
conn.deregister([
'A.{}'.format(symbol),
'AM.{}'.format(symbol)
])
# Replace aggregated 1s bars with incoming 1m bars
@conn.on(r'AM$')
async def handle_minute_bar(conn, channel, data):
ts = data.start
ts -= timedelta(microseconds=ts.microsecond)
minute_history[data.symbol].loc[ts] = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
volume_today[data.symbol] += data.volume
channels = ['trade_updates']
for symbol in symbols:
symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)]
channels += symbol_channels
print('Watching {} symbols.'.format(len(symbols)))
run_ws(conn, channels)
def run_ws(conn, channels): try: conn.run(channels) except Exception as e: print(e) conn.close() run_ws(conn, channels)
if name == "main":
nyc = timezone('America/New_York')
today = datetime.today().astimezone(nyc)
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
calendar = api.get_calendar(start=today_str, end=today_str)[0]
market_open = today.replace(
hour=calendar.open.hour,
minute=calendar.open.minute,
second=0
)
market_open = market_open.astimezone(nyc)
market_close = today.replace(
hour=calendar.close.hour,
minute=calendar.close.minute,
second=0
)
market_close = market_close.astimezone(nyc)
# Wait until just before we might want to trade
current_dt = datetime.today().astimezone(nyc)
since_market_open = current_dt - market_open
while since_market_open.seconds // 60 <= 14:
time.sleep(1)
since_market_open = current_dt - market_open
run(get_tickers(), market_open, market_close)
`
File "algo.py", line 396, in
run(get_tickers(), market_open, market_close)
File "algo.py", line 87, in run
minute_history = get_1000m_history_data(symbols)
File "algo.py", line 39, in get_1000m_history_data
size="minute", symbol=symbol, limit=1000
TypeError: historic_agg_v2() got an unexpected keyword argument 'size'