Open taoluo opened 8 years ago
@mickeydonkey the best place to look for examples on how the pipeline machinery works is the pipeline test suite, which lives in tests/pipeline
.
There isn't a great short answer to the question of "how do I use Pipeline with real data", because the Pipeline API exists primarily for simplifying computations on large point-in-time datasets, and there aren't many such datasets freely available for public use. The most promising one that I'm aware of is Quandl's WIKI dataset, which contains a couple thousand assets and includes dividends and splits. I have a branch lying around somewhere that started building machinery for creating a zipline-compatible asset database from there.
The long answer to your question is that, to run an algorithm using the Pipeline machinery, you need need to write a function, get_loader
, that takes a pipeline dataset column, (e.g. USEquityPricing.close
) and returns an object implementing a method named load_adjusted_array
whose signature is
def load_adjusted_array(self, columns, dates, sids, mask):
load_adjusted_array
should return a dictionary mapping the entries in columns
to instances of AdjustedArray
containing data for the requested dates and sids (sids is a term for asset_ids
in zipline for historical reasons).
If the dataset you want to use is small enough to hold in memory all at once, then you can use the built-in DataFrameLoader
class from zipline.pipeline.loaders.frame
for your loaders. The docstring for that class describes its functionality fairly well:
"""
A PipelineLoader that reads its input from DataFrames.
Mostly useful for testing, but can also be used for real work if your data
fits in memory.
Parameters
----------
column : zipline.pipeline.data.BoundColumn
The column whose data is loadable by this loader.
baseline : pandas.DataFrame
A DataFrame with index of type DatetimeIndex and columns of type
Int64Index. Dates should be labelled with the first date on which a
value would be **available** to an algorithm. This means that OHLCV
data should generally be shifted back by a trading day before being
supplied to this class.
adjustments : pandas.DataFrame, default=None
A DataFrame with the following columns:
sid : int
value : any
kind : int (zipline.pipeline.loaders.frame.ADJUSTMENT_TYPES)
start_date : datetime64 (can be NaT)
end_date : datetime64 (must be set)
apply_date : datetime64 (must be set)
The default of None is interpreted as "no adjustments to the baseline".
"""
The adjustments
frame is used to represent events that retroactively change our view of history. Most commonly, these are splits and dividends, which apply a backward-looking multiplier to the baseline array. If your dataset already uses "adjusted" prices/volumes, then you probably just want to pass None
here.
@ssanderson Thanks for the pointer, I am on my way of inspecting and running tests. I think it will take me some time to fully understand your long answer.
Given this recommendation: "The most promising one that I'm aware of is Quandl's WIKI dataset, which contains a couple thousand assets and includes dividends and splits." in relation to its implementation as a data bundle in 1.0, was the intent to enable Pipeline computations in zipline? If so, my naive attempt at running a Quantopian-tested algorithm did not seem to initialize the Pipeline.
I got a bit further. Guided by the zipline source code, I instantiated TradingAlgorithm() instead of calling run_algorithm(). I gave a get_pipeline_loader parameter to its constructor => get_pipeline_loader=lambda column: pipeline_loader, where pipeline_loader comes from USEquityPricingLoader.from_files(path, path).
However, this seems to only factor in securities which I explicitly referenced using symbols(), not the broader universe. It would be great if I could download complete data bundles by sector and use that in the pipeline.
run_algorithm
should setup the pipeline loader, not doing so is a bug which we will try to fix soon in a 1.0.1 release. When you say that the loader can only reference things seen in a symbols
call, what do you mean. What is the shape of the result of pipeline_output
?
Great! I hesitate to add much more info to this issue as I might have loaded the data incorrectly. I do not want to add noise to your issues. As soon as 1.0.1 is out, or this particular fix is committed to the master, I will try again using the natural method and give a more detailed account should I continue to experience the behavior.
Is it possible to extend DataFrameLoader
to accept multiple columns or a full dataset as an input?
See: https://github.com/quantopian/zipline/blob/master/tests/pipeline/test_frameload.py#L51-L53
what's the relationship between data bundle and pipeline.data ?
I've cobbled together a minimal example of running a pipeline combining pricing and a custom data source as:
from zipline.data import bundles
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.data import Column
from zipline.pipeline.data import DataSet
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.utils.calendars import get_calendar
import numpy as np
import pandas as pd
trading_calendar = get_calendar('NYSE')
bundle_data = bundles.load('quantopian-quandl')
# Set up Custom Data Source for two sids for DataFrameLoader
class MyDataSet(DataSet):
column_A = Column(dtype=float)
column_B = Column(dtype=bool)
dates = pd.date_range('2014-01-01', '2017-01-01')
assets = bundle_data.asset_finder.lookup_symbols(['A', 'AAL'], as_of_date=None)
sids = pd.Int64Index([asset.sid for asset in assets])
# The values for Column A will just be a 2D array of numbers ranging from 1 -> N.
column_A_frame = pd.DataFrame(
data=np.arange(len(dates)*len(assets), dtype=float).reshape(len(dates), len(assets)),
index=dates,
columns=sids,
)
# Column B will always provide True for 0 and False for 1.
column_B_frame = pd.DataFrame(data={sids[0]: True, sids[1]: False}, index=dates)
loaders = {
MyDataSet.column_A: DataFrameLoader(MyDataSet.column_A, column_A_frame),
MyDataSet.column_B: DataFrameLoader(MyDataSet.column_B, column_B_frame),
}
def my_dispatcher(column):
return loaders[column]
# Set up pipeline engine
# Loader for pricing
pipeline_loader = USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
)
def choose_loader(column):
if column in USEquityPricing.columns:
return pipeline_loader
return my_dispatcher(column)
engine = SimplePipelineEngine(
get_loader=choose_loader,
calendar=trading_calendar.all_sessions,
asset_finder=bundle_data.asset_finder,
)
p = Pipeline(
columns={
'price': USEquityPricing.close.latest,
'col_A': MyDataSet.column_A.latest,
'col_B': MyDataSet.column_B.latest
},
screen=StaticAssets(assets)
)
df = engine.run_pipeline(
p,
pd.Timestamp('2016-01-05', tz='utc'),
pd.Timestamp('2016-01-07', tz='utc')
)
print df
The pipeline recognizes MyDataSet.column_A
and MyDataSet.column_B
as valid, however it does not find the actual data; it returns the default values. The returned DataFrame is:
col_A col_B price
2016-01-05 00:00:00+00:00 Equity(0 [A]) NaN False 40.69
Equity(2 [AAL]) NaN False 40.91
2016-01-06 00:00:00+00:00 Equity(0 [A]) NaN False 40.55
Equity(2 [AAL]) NaN False 40.52
2016-01-07 00:00:00+00:00 Equity(0 [A]) NaN False 40.73
Equity(2 [AAL]) NaN False 41.23
I can see that the DataFrameLoader
is working properly as:
loader = my_dispatcher(MyDataSet.column_A)
adj_array = loader.load_adjusted_array(
[MyDataSet.column_A],
dates,
sids,
np.ones((len(dates), len(sids)), dtype=bool)
)
print adj_array.values()[0].inspect()
which returns
Adjusted Array (float64):
Data:
array([[ 0.00000000e+00, 1.00000000e+00],
[ 2.00000000e+00, 3.00000000e+00],
[ 4.00000000e+00, 5.00000000e+00],
...,
[ 2.18800000e+03, 2.18900000e+03],
[ 2.19000000e+03, 2.19100000e+03],
[ 2.19200000e+03, 2.19300000e+03]])
Adjustments:
{}
Can you give some guidance of where I am going wrong pointing the SimplePipelineEngine
to this DataFrame
?
hey @marketneutral! I think the issue you're running into here is that dates
in your example contains timezone-naive dates, but your run_pipeline
call is using tz-aware dates. If I drop a breakpoint in DataFrameLoader.load_adjusted_array
, I see date_indexer
coming back as all -1
, which means we're failing to align the dates requested by the pipeline engine with the dates held by the loader.
In general, zipline represents dates as midnight of the date in question, localized to UTC. If I could wave a magic wand, I would remove the timezone localization (or, even better, use a pandas Period instead of a timestamp), but doing so would be a major backwards compatibility headache. Given the current state of things, the right fix for this is to change the construction of dates
to:
dates = pd.date_range('2014-01-01', '2017-01-01', tz='UTC')
which appears to print the expected result:
$ python scratch2.py
col_A col_B price
2016-01-05 00:00:00+00:00 Equity(111 [AAL]) 1469.0 False 40.91
Equity(2730 [A]) 1468.0 True 40.69
2016-01-06 00:00:00+00:00 Equity(111 [AAL]) 1471.0 False 40.52
Equity(2730 [A]) 1470.0 True 40.55
2016-01-07 00:00:00+00:00 Equity(111 [AAL]) 1473.0 False 41.23
Equity(2730 [A]) 1472.0 True 40.73
Hi @marketneutral and @ssanderson, I have been using the code in this thread as a starting point to include fundamental data in a backtest algorithm. Hopefully you can help me understand the errors coming from pipeline_output()
during backtesting. I have loaded data into a pipeline, and printed it out, using the following code in an iPython notebook:
engine = SimplePipelineEngine(
get_loader=choose_loader,
calendar=trading_calendar.all_sessions,
asset_finder=bundle_data.asset_finder,
)
revenue_factor = SimpleMovingAverage( # arbitrary fundamental data... I used quarterly revenue
inputs=[MyDataSet.column_A],
window_length=3,
)
def make_pipeline():
return Pipeline(
columns={
'price': USEquityPricing.close.latest,
'Revenue': MyDataSet.column_A.latest,
},
screen=StaticAssets(assets) & revenue_factor.top(10)
)
pipeline_output = engine.run_pipeline( # run the pipeline every day, between these dates
make_pipeline(),
pd.Timestamp('2016-01-05', tz='utc'),
pd.Timestamp('2017-01-12', tz='utc')
)
print(pipeline_output.head(10))
Revenue price
2016-01-05 00:00:00+00:00 Equity(3 [AAPL]) 5.150100e+10 105.35
Equity(10 [ABC]) 3.547038e+10 32.03
Equity(546 [CVS]) 3.864400e+10 96.46
Equity(550 [CVX]) 3.431500e+10 88.85
Equity(747 [F]) 3.814400e+10 17.76
Equity(1938 [T]) 3.909100e+10 12.38
Equity(2081 [UNH]) 4.148900e+10 116.46
Equity(2152 [VZ]) 3.315800e+10 33.19
Equity(2197 [WMT]) 1.174080e+11 61.46
Equity(2235 [XOM]) 6.734400e+10 77.46
However, the following code (in an iPython cell below the previous code), produces the following errors:
def initialize(context):
attach_pipeline(
make_pipeline(),
'data_pipe'
)
def before_trading_start(context, data):
print(pipeline_output('data_pipe'))
def handle_data(context, data):
order(symbol('AAPL'), 10)
# record(AAPL=data[symbol('AAPL')].price)
start = pd.Timestamp('2016-01-05', tz='utc')
end = pd.Timestamp('2016-01-09', tz='utc')
run_algorithm(
before_trading_start=before_trading_start,
start = start,
end=end,
initialize=initialize,
capital_base=10000,
handle_data=handle_data)
Error 1
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/algorithm.py in _pipeline_output(self, pipeline, chunks, name)
2486 try:
-> 2487 data = self._pipeline_cache.get(name, today)
2488 except KeyError:
~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/utils/cache.py in get(self, key, dt)
152 try:
--> 153 return self._cache[key].unwrap(dt)
154 except Expired:
KeyError: 'data_pipe'
Error 2
During handling of the above exception, another exception occurred:
# lots of error tracing code... Maybe I should post it, maybe not?
~/.pyenv/versions/3.5.4/lib/python3.5/site-packages/zipline/utils/run_algo.py in choose_loader(column)
170 return pipeline_loader
171 raise ValueError(
--> 172 "No PipelineLoader registered for column %s." % column
173 )
174 else:
ValueError: No PipelineLoader registered for column MyDataSet.column_A::float64.
It seems that when you use the run_pipeline()
command, you can specify loaders, but when using pipeline_output()
in before_trading_start()
, you need to do something differently.
Can you please help me understand how to overcome these errors?
@calmitchell617 the error you're getting there is happening because your algorithm doesn't know anything about your MyDataSet
class.
Internally, run_algorithm
constructs and runs an instance of TradingAlgorithm
, which in turn constructs an instance of the SimplePipelineEngine
class that you're building in your first example. For historical reasons, TradingAlgorithm doesn't take the full engine as a parameter, it only takes the loader-dispatching function (choose_loader
, in your example above), but currently run_algorithm
always supplies a hard-coded dispatcher that for USEquityPricing
:
pipeline_loader = USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
)
def choose_loader(column):
if column in USEquityPricing.columns:
return pipeline_loader
raise ValueError(
"No PipelineLoader registered for column %s." % column
)
(source)
The easiest short-term fix for this is probably to add a new optional get_pipeline_loader
parameter to run_algorithm
, which would be used to supply your own loader dispatcher instead of the default one.
In the medium term, it'd be nice to allow people to register their own pipeline dispatch functions as part of the Zipline extension machinery. @llllllllll might have thoughts about what an API for that would look like.
Thank you Scott, that worked.
For anyone wondering what I did, here is a brief summary of the changes I made to load external data, using the run_algorithm()
command, in an iPython notebook:
In zipline/utils/run_algo.py, I changed run_algorithm()
to include an additional parameter my_loader
:
def run_algorithm(start,
end,
initialize,
capital_base,
handle_data=None,
before_trading_start=None,
analyze=None,
data_frequency='daily',
data=None,
bundle=None,
bundle_timestamp=None,
trading_calendar=None,
metrics_set='default',
default_extension=True,
extensions=(),
strict_extensions=True,
environ=os.environ,
my_loader=None):
I also changed the return _run()
statement at the end of the run_algorithm()
function to include that same parameter:
return _run(
my_loader,
handle_data=handle_data,
initialize=initialize,
before_trading_start=before_trading_start,
analyze=analyze,
algofile=None,
algotext=None,
defines=(),
data_frequency=data_frequency,
capital_base=capital_base,
data=data,
bundle=bundle,
bundle_timestamp=bundle_timestamp,
start=start,
end=end,
output=os.devnull,
trading_calendar=trading_calendar,
print_algo=False,
metrics_set=metrics_set,
local_namespace=False,
environ=environ,
)
I then changed the _run()
function to include the parameter:
def _run(my_loader,
handle_data,
initialize,
before_trading_start,
analyze,
algofile,
algotext,
defines,
data_frequency,
capital_base,
data,
bundle,
bundle_timestamp,
start,
end,
output,
trading_calendar,
print_algo,
metrics_set,
local_namespace,
environ):
Lastly, I changed the nested choose_loader()
function like so:
def choose_loader(column):
if column in USEquityPricing.columns:
return pipeline_loader
return my_loader
As @ssanderson said, this is a short term solution, and only allows you to load one column of external data, but it could definitely be expanded upon.
Here is the part of the iPython notebook where I call run_algorithm()
, and the subsequent output:
run_algorithm(
before_trading_start=before_trading_start,
start = start,
end=end,
initialize=initialize,
capital_base=10000,
handle_data=handle_data,
my_loader=DataFrameLoader(MyDataSet.column_A, column_A_frame))
Revenue price
Equity(3 [AAMC]) 5.150100e+10 20.01
Equity(10 [AAWW]) 3.547038e+10 40.22
Equity(333 [BDE]) 2.805500e+10 4.37
Equity(546 [CFX]) 3.864400e+10 23.74
Equity(550 [CHD]) 3.431500e+10 83.51
Equity(747 [CUTR]) 3.814400e+10 12.25
Equity(1938 [NAT]) 3.909100e+10 15.14
Equity(2152 [PATK]) 3.315800e+10 41.65
Equity(2197 [PERY]) 1.174080e+11 18.18
Equity(2235 [PKOH]) 6.734400e+10 34.67
@calmitchell617 glad that worked for you. Would you be interested in putting together a PR to update run_algorithm
to support custom pipeline dispatchers?
I would be happy to give it a shot, will follow up early next week.
would be great, Cal. Your code helped me as well. If you need to generate earnings data, I made a script that does that and creates files for every bundle you registered. Not production ready, but useful if you want to test with dates. https://github.com/peterfabakker/zipline-utils/blob/master/getearnings.py
Hey @ssanderson, I've been trying to create a minimal pipeline example to get data via the blaze loader, similar to the minimal DataFrameLoader
example above. I've gotten this far:
import blaze as bz
import numpy as np
import pandas as pd
import sqlite3
import itertools
from zipline.data import bundles
from zipline.utils.calendars import get_calendar
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.data import DataSet
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.filters import StaticAssets
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.loaders.blaze import BlazeLoader, from_blaze
trading_calendar = get_calendar('NYSE')
bundle_data = bundles.load('quandl')
# spoof some data ---------------------------
np.random.seed(100)
start = trading_calendar.closes.index.get_loc('2016-01-04 00:00:00+00:00')
end = trading_calendar.closes.index.get_loc('2018-08-06 00:00:00+00:00')
dates = trading_calendar.closes[start:end]
sids = bundle_data.asset_finder.sids
df = pd.DataFrame(
data={'value': np.random.random(size=len(dates)*len(sids))},
index = pd.MultiIndex.from_tuples(list(itertools.product(dates,sids)), names=('asof_date', 'sid'))
)
df.to_sql('my_ds_table', sqlite3.connect('temp.db'), if_exists='replace', index=False)
# create the blaze expr and DataSet ------------------------
expr = bz.data(
'sqlite:///temp.db::my_ds_table',
dshape='var * {value: float64, sid: int64, asof_date: datetime}'
)
# create and empty BlazeLoader
my_blaze_loader = BlazeLoader()
# create the DataSet
ds = from_blaze(
expr,
no_deltas_rule='ignore',
no_checkpoints_rule='ignore',
loader=my_blaze_loader
)
The test I can see from your tests is issubclass(ds, DataSet)
and this does assert to True
.
my_blaze_loader
looks like
<BlazeLoader: {<DataSet: 'BlazeDataSet_0'>: ExprData(expr="Merge(_child=_1, children=(_1, label(_1.asof_date, 'timestamp')))", deltas='None', checkpoints='None', odo_kwargs={}, apply_deltas_adjustments=True)}>
And now on to get the engine going
pipeline_loader = USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
)
def choose_loader(column):
if column in USEquityPricing.columns:
return pipeline_loader
else:
return my_blaze_loader
engine = SimplePipelineEngine(
get_loader=choose_loader,
calendar=trading_calendar.all_sessions,
asset_finder=bundle_data.asset_finder,
)
and on to running the pipline
assets = bundle_data.asset_finder.lookup_symbols(['A', 'AAL'], as_of_date=None)
p = Pipeline(
columns={
'price': USEquityPricing.close.latest,
'col_A': ds.value.latest,
},
screen=StaticAssets(assets)
)
df = engine.run_pipeline(
p,
pd.Timestamp('2016-01-05', tz='utc'),
pd.Timestamp('2016-01-07', tz='utc')
)
This returns a KeyError
looking for 'asof_date'
:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-18-4175221aaafb> in <module>()
2 p,
3 pd.Timestamp('2016-01-05', tz='utc'),
----> 4 pd.Timestamp('2016-01-07', tz='utc')
5 )
/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/engine.pyc in run_pipeline(self, pipeline, start_date, end_date)
309 dates,
310 assets,
--> 311 initial_workspace,
312 )
313
/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/engine.pyc in compute_chunk(self, graph, dates, assets, initial_workspace)
495 loader = get_loader(term)
496 loaded = loader.load_adjusted_array(
--> 497 to_load, mask_dates, assets, mask,
498 )
499 workspace.update(loaded)
/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/loaders/blaze/core.pyc in load_adjusted_array(self, columns, dates, assets, mask)
998 self.pool.imap_unordered(
999 partial(self._load_dataset, dates, assets, mask),
-> 1000 itervalues(groupby(getdataset, columns)),
1001 ),
1002 )
/anaconda3/envs/py27/lib/python2.7/site-packages/toolz/dicttoolz.pyc in merge(*dicts, **kwargs)
36
37 rv = factory()
---> 38 for d in dicts:
39 rv.update(d)
40 return rv
/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/pipeline/loaders/blaze/core.pyc in _load_dataset(self, dates, assets, mask, columns)
1070 (
1071 materialized_checkpoints,
-> 1072 materialized_expr.get(),
1073 ),
1074 ignore_index=True,
/anaconda3/envs/py27/lib/python2.7/site-packages/zipline/utils/pool.pyc in get(self)
30 """
31 if not self._successful:
---> 32 raise self._value
33 return self._value
34
KeyError: 'asof_date'
If you've gotten this far, thank you 😃 ... I feel like I am close here! Thanks in advance for any pointers.
hey @marketneutral. I won't have time to write up a full reply today, but take a look at the module docs for zipline.pipeline.loaders.blaze.core
. It's a pretty comprehensive rundown of the data format expected by the blaze loader.
@marketneutral are you sure those are running on the same database? If I run your script and look at the schema on the sqlite CLI, I see:
sqlite> .schema
CREATE TABLE "my_ds_table" (
"value" REAL
);
which is what i would expect given that you're calling df.to_sql
with index=False
.
Yes, thank you. That was genesis of the error. Now I df.reset_index()
prior to calling df.to_sql
. I will post the cleaned up and working example tomorrow.
Hey @ssanderson This is my complete working minimal example:
https://github.com/marketneutral/research-tools/blob/master/pipeline-blaze-minimal.ipynb
Works now!
Awesome! One thing to be mindful of if you're using the blaze loader is that the expected semantics for asof_date
might not always be what you expect. In particular, we conservatively assume that a record with an asof_date of day N may contain information that wouldn't have been available until the end of that date (e.g., a daily close price). Since pipelines semantically execute at the start of the trading day, this means that data with an asof_date of day N gets labelled with day N + 1 in pipeline outputs (because that's the morning of the first day on which we would have had access to the data observation). The simplest case of this is something like USEquityPricing.close.latest
which, for day N, will contain the previous day's close.
In our experience, these semantics are generally what you want for things like pricing and fundamental data, but they can be "off by one" from what you might expect for other datasets, so it's important to be mindful of date-labelling conventions when you're setting up your data.
Hi @marketneutral thank you for putting together this minimal example, it is very helpful. I'm having some trouble running it though. Do you have a dependency list that you know this works with?
@RaymondMcT I've spent a little (very little) time refactoring this into a Python package with a proper setup.py
which would include dependencies. It's a WIP. Will let you know...
I hear ya, If I get it going I'll report back with a list of working dependencies.
hey @RaymondMcT, try this https://github.com/marketneutral/alphatools and please lmk.
@RaymondMcT can you file an issue here so we don't pollute this thread with non-valued added things for the fine Quantopian folks?
@marketneutral Hi, I have run your code, it works. But when I change the dates, I get the following error:
Traceback (most recent call last):
File "/Users/cemalarican/Desktop/THESIS/PIPE.py", line 82, in
ValueError: The first date of the lifetimes matrix does not match the start date of the pipeline. Did you forget to align the start_date to the trading calendar?
This means your date does not exist in the history.
this question was originally posted on zipline's google group: There are some posts on quantopian.com introducing pipeline under the online Algorithm environment, while I am wondering how to modify the algorithm to run pipeline offline.
can anyone provide an short example ? I really appreciate any help you could provide.