alphaville76 / sharadar_db_bundle

10 stars 8 forks source link

incorrect data pipeline in live trading #42

Open vaa1234 opened 2 years ago

vaa1234 commented 2 years ago

In live trading, the pipeline data is shifted 1 day earlier.

Example:

from exchange_calendars import get_calendar
import os
import datetime
import time
import numpy as np
import pandas as pd
import zipline.api as algo
from sharadar.pipeline.engine import symbols, symbol
from sharadar.util.run_algo import run_algorithm
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from sharadar.util.logger import StrategyLogger
from zipline.pipeline.factors import SimpleMovingAverage, ExponentialWeightedMovingStdDev
from sharadar.pipeline.factors import Exchange, AverageDollarVolume, CustomFactor, IsDomesticCommonStock
from zipline.pipeline.filters import AllPresent, All, StaticAssets, StaticSids
from zipline.finance.execution import MarketOrder, LimitOrder
from zipline.finance.commission import PerShare
from zipline.finance import slippage, commission

from sharadar.live.brokers.ib_broker import IBBroker

from ib_insync import IB, Stock, util as ib_insync_util
import nest_asyncio
log = BacktestLogger(__file__)

def make_pipeline():
    pipeline = Pipeline(
        columns = {
            'close': USEquityPricing.close.latest,
        },
        screen=( StaticAssets([symbol('BRBR')]) )
    )

    return pipeline

def initialize(context):
    algo.attach_pipeline(make_pipeline(), 'pipeline')

def handle_data(context, data):
    pass

def before_trading_start(context, data):
    context.candidates = algo.pipeline_output('pipeline')
    log.info('context.candidates: {}'.format(context.candidates))

if __name__ == "__main__":

    xnys = get_calendar("XNYS")

    run_algorithm(
        handle_data=handle_data,
        initialize=initialize,
        before_trading_start=before_trading_start,
        data_frequency = 'minute',
        broker=IBBroker(tws_uri='localhost:7496:1239'),
        state_filename='./statefile',
        trading_calendar = xnys
        # bundle=bundle_name,
    )

Result:

[2022-05-18 14:04:01] INFO: Connecting: localhost:7496:1239
[2022-05-18 14:04:01] INFO: [2104] Market data farm connection is OK:usfarm.nj (-1)
[2022-05-18 14:04:01] INFO: [2104] Market data farm connection is OK:usfarm (-1)
[2022-05-18 14:04:01] INFO: [2106] HMDS data farm connection is OK:ushmds (-1)
[2022-05-18 14:04:01] INFO: [2158] Sec-def data farm connection is OK:secdefnj (-1)
[2022-05-18 14:04:01] INFO: Managed accounts: ['DU12345']
[2022-05-18 14:04:01] INFO: Local-Broker Time Skew: 0 days 00:00:00.598403
[2022-05-18 14:04:01] INFO: Using bundle 'sharadar'.
[2022-05-18 14:04:01] INFO: Live Trading on 2022-05-18.
[2022-05-18 14:04:01] INFO: Initialized blotter_live
[2022-05-18 14:04:01] INFO: Loading state from ./statefile
[2022-05-18 14:04:01] INFO: initialization done
[2022-05-18 14:04:01] INFO: (self.get_datetime() : 2022-05-18 00:00:00+00:00
[2022-05-18 14:04:01] INFO: today in _pipeline_output : 2022-05-17 00:00:00+00:00
[2022-05-18 14:04:01] INFO: KeyError self.run_pipeline
Pipeline from 2022-05-17 to 2022-05-17  [####################################]  100%[2022-05-18 14:04:02] INFO: Pipeline from 2022-05-17 to 2022-05-17 completed in 0:00:00.
[2022-05-18 14:04:02] INFO: (self.get_datetime() : 2022-05-18 00:00:00+00:00
[2022-05-18 14:04:02] INFO: today in _pipeline_output : 2022-05-17 00:00:00+00:00
[2022-05-18 14:04:02] INFO: self._pipeline_cache.get
[2022-05-18 14:04:02] INFO: context.candidates:                        close
Equity(110458 [BRBR])  24.39
from sharadar.pipeline.engine import prices as get_pricing, symbols
get_pricing(symbols(['BRBR']), '2022-05-17', '2022-05-17', 'close', 2)

2022-05-13 00:00:00+00:00    24.72
2022-05-16 00:00:00+00:00    24.39
2022-05-17 00:00:00+00:00    25.82
Freq: C, Name: Equity(110458 [BRBR]), dtype: float64

Close on 2022-05-17 it should be 25.82, but pipeline gives the close price on 2022-05-16.

In my opinion, this is a error and should not be.