zipline-live / zipline

Zipline-Live, a Pythonic Algorithmic Trading Library
http://www.zipline-live.io/
Apache License 2.0
396 stars 65 forks source link

Continuous ingestion of realtime data from live brokers #26

Open tibkiss opened 7 years ago

tibkiss commented 7 years ago

Zipline-Live will support live data feed from Brokers. To match with Quantopian's original design the algorithms will access the live feed through data.current().

As it is important to have reproducible runs for the algorithms it would be beneficial to store (or continuously ingest) the live feed. Data is precious.

Live feed storage should be transparent to the algo and broker agnostic.

fredfortier commented 7 years ago

Here is how I attempted to implement this in a separate project.

During the live algorithm initialization, I created minutes writer and reader objects and assigned them to the broker:

    def _create_minute_writer(self):
        root = get_broker_minute_writer_root(self.broker.name)
        filename = os.path.join(root, 'metadata.json')

        if os.path.isfile(filename):
            writer = BcolzMinuteBarWriter.open(
                root, self.sim_params.end_session)
        else:
            writer = BcolzMinuteBarWriter(
                rootdir=root,
                calendar=self.trading_calendar,
                minutes_per_day=NYSE,
                start_session=self.sim_params.start_session,
                end_session=self.sim_params.end_session,
                write_metadata=True
            )

        self.broker.minute_writer = writer
        self.broker.minute_reader = BcolzMinuteBarReader(root)

Then in get_spot_value, I do something like this:

        # Should I not use a timezone?
        dt = pd.Timestamp.utcnow().floor('1 min')
        value = None
        if self.minute_reader is not None:
            try:
                # Slight delay to minimize the chances that multiple algos
                # might try to hit the cache at the exact same time.
                sleep_time = random.uniform(0.5, 0.8)
                sleep(sleep_time)
                # TODO: This does not always return a value! Why is that?
                value = self.minute_reader.get_value(
                    sid=asset.sid,
                    dt=dt,
                    field=field
                )
            except Exception as e:
                log.warn('minute data not found: {}'.format(e))

        if value is None or np.isnan(value):
            # This does not directly apply to current IB broker implementation
            # I have a separate get_candles method which returns an OHLCV dict
            # I use it to get spot value and histry
            ohlc = self.get_candles(data_frequency, asset)
            if field not in ohlc:
                raise KeyError('Invalid column: %s' % field)

            if self.minute_writer is not None:
                df = pd.DataFrame(
                    [ohlc],
                    index=pd.DatetimeIndex([dt]),
                    columns=['open', 'high', 'low', 'close', 'volume']
                )

                try:
                    self.minute_writer.write_sid(
                        sid=asset.sid,
                        df=df
                    )
                    log.debug('wrote minute data: {}'.format(dt))
                except Exception as e:
                    # Since the reader comes out empty, it is going to hit this exception when trying
                    # to get spot value for a single asset more than once in the same minutely bar
                    log.warn(
                        'unable to write minute data: {} {}'.format(dt, e))

Here is the issue: the data seems to be written correctly but self.minute_reader.get_value comes out empty. I did not have time to troubleshoot this extensively but I believe that it has something to do with its internal calculation of a minute index from the date. I'm not sure if I'm doing this optimally.

tibkiss commented 7 years ago

@fredfortier : Thanks for sharing! For some reason I missed this post. Glad to see that you made progress. Do you have any update on this?

tibkiss commented 7 years ago

I'm not exactly sure on this, but I suspect that you need to update the asset_db to advance the symbol's end_date after minutely ingestion.

fredfortier commented 7 years ago

Yes, the code above seems to be working. If I recall correctly, the issue was with the calendar parameter. I'm trading an asset with a 24H schedule so I had to change it to OPEN. I don't personally use the asset_db, but it may need some updates when trading equities as you are suggesting.

tibkiss commented 7 years ago

Continuous ingestion (i.e.: load data directly to the bundles) is not easily achievable: zipline has a bundle reader and a writer, but there is no read-write bundle handler implemented yet. Moreover, the reader uses caching which cannot be invalidated from outside.

Therefore I implemented a much simpler solution: collect the bars and dump out to csv at the end of the session. The collected realtime bars are merged with the historical bars to provide data continuity. What's needed from the user side is to ingest EOD the dumped CSV files. The logic is published in zipline-live v1.1.0.3.

I'll keep this issue open until we provide an 'official' way of ingestion EOD. For that we could use @bartosh 's work on CSV bundle which is under review in zipline: https://github.com/quantopian/zipline/pull/1860