quantopian / zipline

Zipline, a Pythonic Algorithmic Trading Library
https://www.zipline.io
Apache License 2.0
17.61k stars 4.72k forks source link

Retrieve trading data from Mysql #2184

Open mrahmadt opened 6 years ago

mrahmadt commented 6 years ago

Hello Everyone,

I store all market data on MySQL so I can use it from many applications, can I somehow configure zipline to retrieve the stock data from MySQL instead of using CSV file?

Thanks

AndreasClenow commented 6 years ago

I do the same. Below is a simple bundle I made for demo purposes. I'll write up an article soon on how it works, and where the pitfalls are.

Note the db connection string, where you have to fill in your own details. You would also need to adapt queries to your structure of course. My data in this example is assumed to be corrected for splits and corp actions, but not for dividends. Dividends are provided separately, as you see.

This code is adapted from the csvdir sample, replacing csv logic with MySql logic.

It's important to remember that Zipline seem to require that data provided exactly matches the selected trading calendar. There are many real life exceptions to that rule, but Zipline will crash your ingest process if the data doesn't match. So you need to make sure that you check and fix this first, padding any missing dates.

""" Imports daily equity data from MySql db """ import sys

from logbook import Logger, StreamHandler from numpy import empty from pandas import DataFrame, read_sql_query, Index, Timedelta, NaT

from zipline.utils.calendars import register_calendar_alias from zipline.utils.cli import maybe_show_progress

from sqlalchemy import create_engine

from . import core as bundles

handler = StreamHandler(sys.stdout, format_string=" | {record.message}") logger = Logger(name) logger.handlers.append(handler)

engine = create_engine('mysql+mysqlconnector://db_user_name:db_password@db_server/db_name')

def ac_equities_db(tframes=None, csvdir=None):

return AcDbBundle(tframes, csvdir).ingest

class AcDbBundle: """ Wrapper class """

def __init__(self, tframes=None, csvdir=None):
    self.tframes = tframes
    self.csvdir = csvdir

def ingest(self,
           environ,
           asset_db_writer,
           minute_bar_writer,
           daily_bar_writer,
           adjustment_writer,
           calendar,
           start_session,
           end_session,
           cache,
           show_progress,
           output_dir):

    ac_db_bundle(environ,
                  asset_db_writer,
                  minute_bar_writer,
                  daily_bar_writer,
                  adjustment_writer,
                  calendar,
                  start_session,
                  end_session,
                  cache,
                  show_progress,
                  output_dir,
                  self.tframes,
                  self.csvdir)

@bundles.register("ac_db_bundle") def ac_db_bundle(environ, asset_db_writer, minute_bar_writer, daily_bar_writer, adjustment_writer, calendar, start_session, end_session, cache, show_progress, output_dir, tframes=None, csvdir=None): """ Build a zipline data bundle. """

if not tframes:
    tframes = ['daily']

divs_splits = {'divs': DataFrame(columns=['sid', 'amount',
                                          'ex_date', 'record_date',
                                          'declared_date', 'pay_date']),
               'splits': DataFrame(columns=['sid', 'ratio',
                                            'effective_date'])}

for tframe in tframes: # only daily implemented for now.

    # get available stocks from db, minus indexes
    symbol_query = "select distinct ticker from equity_history where ticker not like '$%' order by ticker"# limit 50"

    symbol_df = read_sql_query(symbol_query, engine)
    symbols = sorted(symbol_df.ticker.unique())

    if not symbols:
        raise ValueError("No symbols found in db.")

    dtype = [('start_date', 'datetime64[ns]'),
             ('end_date', 'datetime64[ns]'),
             ('auto_close_date', 'datetime64[ns]'),
             ('symbol', 'object')]
    metadata = DataFrame(empty(len(symbols), dtype=dtype))

    if tframe == 'minute':          # not implemented yet
        writer = minute_bar_writer
    else:
        writer = daily_bar_writer

    writer.write(_pricing_iter(symbols, metadata,
                 divs_splits, show_progress),
                 show_progress=show_progress)

    metadata['exchange'] = "NYSE"

    asset_db_writer.write(equities=metadata)

    divs_splits['divs']['sid'] = divs_splits['divs']['sid'].astype(int)
    divs_splits['splits']['sid'] = divs_splits['splits']['sid'].astype(int)

    adjustment_writer.write(splits=divs_splits['splits'],
                            dividends=divs_splits['divs'])

def _pricing_iter(symbols, metadata, divs_splits, show_progress): with maybe_show_progress(symbols, show_progress, label='Loading db pricing data: ') as it: for sid, symbol in enumerate(it): logger.debug('%s: sid %s' % (symbol, sid))

        query = "select trade_date as date, open, high, low, close, volume, dividend from equity_history where ticker='%s' order by trade_date;" % symbol

        dfr = read_sql_query(query, engine, index_col='date', parse_dates=['date'])

        start_date = dfr.index[0]
        end_date = dfr.index[-1]

        # The auto_close date is the day after the last trade.
        ac_date = end_date + Timedelta(days=1)
        metadata.iloc[sid] = start_date, end_date, ac_date, symbol

        if 'split' in dfr.columns:
            tmp = 1. / dfr[dfr['split'] != 1.0]['split']
            split = DataFrame(data=tmp.index.tolist(),
                              columns=['effective_date'])
            split['ratio'] = tmp.tolist()
            split['sid'] = sid

            splits = divs_splits['splits']
            index = Index(range(splits.shape[0],
                                splits.shape[0] + split.shape[0]))
            split.set_index(index, inplace=True)
            divs_splits['splits'] = splits.append(split)

        if 'dividend' in dfr.columns:
            # ex_date   amount  sid record_date declared_date pay_date
            tmp = dfr[dfr['dividend'] != 0.0]['dividend']
            div = DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
            div['record_date'] = NaT
            div['declared_date'] = NaT
            div['pay_date'] = NaT
            div['amount'] = tmp.tolist()
            div['sid'] = sid

            divs = divs_splits['divs']
            ind = Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
            div.set_index(ind, inplace=True)
            divs_splits['divs'] = divs.append(div)

        yield sid, dfr

register_calendar_alias("AcDbBundle", "NYSE")