wondertrader / wtpy

wtpy是基于wondertrader为底层的针对python的子框架
MIT License
830 stars 233 forks source link

Not receiving data from custom parser #134

Closed linonetwo closed 5 months ago

linonetwo commented 7 months ago

This is the code in demos/test_extmodules that is modified:

import sys
from wtpy import BaseExtParser, BaseExtExecuter
from wtpy import WTSTickStruct
from ctypes import byref
import threading
import time
import random
import datetime

from wtpy import WtEngine,EngineType
sys.path.append('../Strategies')
from DualThrust import StraDualThrust
from EmptyStra import EmptyStra

class MyExecuter(BaseExtExecuter):
    def __init__(self, id: str, scale: float):
        super().__init__(id, scale)

    def init(self):
        print("inited")

    def set_position(self, stdCode: str, targetPos: float):
        print("position confirmed: %s -> %f " % (stdCode, targetPos))

class YourParser(BaseExtParser):
    def __init__(self, id: str):
        super().__init__(id)
        self.__worker__ = None

    def init(self, engine:WtEngine):
        '''
        初始化
        '''
        print(f"Initializing YourParser with ID: {self.__id__}")
        super().init(engine)

    def random_sim(self):
        while True:
            curTick = WTSTickStruct()
            curTick.code = bytes("CFFEX.IF.HOT", encoding="UTF8")
            curTick.exchg = bytes("CFFEX", encoding="UTF8")

            # Simulate tick data
            curTick.price = random.uniform(3000, 4000)  # Random price between 3000 and 4000
            curTick.open = curTick.price
            curTick.high = curTick.price
            curTick.low = curTick.price
            curTick.settle = curTick.price
            curTick.upper_limit = curTick.price + 100
            curTick.lower_limit = curTick.price - 100
            curTick.pre_close = curTick.price
            curTick.pre_settle = curTick.price
            curTick.pre_interest = random.randint(1000, 2000)
            curTick.interest = random.randint(1000, 2000)
            curTick.volume = random.randint(100, 500)  # Random volume
            curTick.amount = curTick.volume * curTick.price
            now = datetime.datetime.now()
            curTick.date = int(now.strftime("%Y%m%d"))
            curTick.time = int(now.strftime("%H%M%S"))
            curTick.millisec = now.microsecond // 1000

            time.sleep(1)
            # Log the simulated data
            print(f"Simulated Tick: Code={curTick.code.decode('UTF8')}, Price={curTick.price}, Time={curTick.date}{curTick.time}")

            # Push the data
            result = self.__engine__.push_quote_from_extended_parser(self.__id__, byref(curTick), True)
            print(f"Push result: {result}")  # Log the result of the push operation

    def connect(self):
        '''
        开始连接
        '''
        print("Connecting YourParser")
        if self.__worker__ is None:
            self.__worker__ = threading.Thread(target=self.random_sim, daemon=True)
            self.__worker__.start()
        return

    def disconnect(self):
        '''
        断开连接
        '''
        print("disconnect")
        return

    def release(self):
        '''
        释放,一般是进程退出时调用
        '''
        print("release")
        return

    def subscribe(self, fullCode:str):
        '''
        订阅实时行情\n
        @fullCode   合约代码,格式如CFFEX.IF2106
        '''
        print("subscribe: " + fullCode)
        return

    def unsubscribe(self, fullCode:str):
        '''
        退订实时行情\n
        @fullCode   合约代码,格式如CFFEX.IF2106
        '''
        print("unsubscribe: " + fullCode)
        return

if __name__ == "__main__":
    #创建一个运行环境,并加入策略
    engine = WtEngine(EngineType.ET_CTA)
    engine.init('../common/', "config.yaml")

    straInfo = EmptyStra(name='test', code="CFFEX.IF.HOT", barCnt=1, period="m5", isForStk=False)
    engine.add_cta_strategy(straInfo)

    yourParser = YourParser("yourParser")
    yourParser.init(engine)
    myExecuter = MyExecuter('exec', 1)
    engine.commitConfig()
    engine.add_exetended_parser(yourParser)
    engine.add_exetended_executer(myExecuter)

    engine.run()

    print('press ctrl-c to exit')
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt as e:
        exit(0)

And this tdparser.yaml config. I understand tdparsers.yaml is about loading parser that provided by .so file? So the parser in the python code don't need to be registered using this yaml file. (I did so, and get Same name of parsers: myParser error)

parsers:
-   active: true
    bport: 9001
    filter: ''
    host: 127.0.0.1
    id: parser1
    module: ParserUDP
    sport: 3997
# -   active: true
#     bport: 9001
#     filter: ''
#     host: 127.0.0.1
#     id: yourParser
#     module: YourParser
#     sport: 3998

I get this log

[12.13 10:26:47 - info ] Callbacks of Extented Parser registration done
[12.13 10:26:47 - info ] Callbacks of Extented Executer registration done
[12.13 10:26:47 - info ] WonderTrader CTA production framework initialzied, version: UNIX v0.9.9 Build@Dec  4 2023 09:51:19
Initializing YourParser with ID: yourParser
[12.13 10:26:47 - info ] Trading sessions loaded
[12.13 10:26:47 - warning] No session configured for CZCE.PX
[12.13 10:26:47 - warning] No session configured for CZCE.SH
[12.13 10:26:47 - warning] No session configured for INE.ec
[12.13 10:26:47 - warning] No session configured for SHFE.br
[12.13 10:26:47 - info ] Commodities configuration file ../common/commodities.json loaded
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX405 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX406 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX407 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX408 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX409 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX410 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX411 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH405 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH406 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH407 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH408 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH409 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH410 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH411 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2404 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2406 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2408 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2410 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2412 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2401 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2402 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2403 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2404 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2405 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2406 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2407 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2408 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2409 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2410 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2411 skipped
[12.13 10:26:47 - info ] Contracts configuration file ../common/contracts.json loaded, 6 exchanges
[12.13 10:26:47 - info ] Holidays loaded
[12.13 10:26:47 - info ] Hot rules loaded
[12.13 10:26:47 - info ] Trading environment initialized, engine name: CTA
[12.13 10:26:47 - info ] Running mode: Production
[12.13 10:26:47 - info ] Strategy filter Q3LS00_if0 loaded
[12.13 10:26:47 - info ] Code filter CFFEX.IF0 loaded
[12.13 10:26:47 - info ] 0 fee templates loaded
[12.13 10:26:47 - debug] 0 position info of portfolio loaded
[12.13 10:26:47 - info ] [RiskControl] Params inited, Checking frequency: 5 s, MaxIDD: ON(20.00%), MaxMDD: OFF(60.00%), Capital: 5000000.0, Profit Boudary: 101.00%, Calc Span: 30 mins, Risk Scale: 0.30
[12.13 10:26:47 - info ] Engine task poolsize is 0
[12.13 10:26:47 - info ] Resampled bars will be aligned by section:  no
[12.13 10:26:47 - info ] Force to cache bars: yes
[12.13 10:26:47 - info ] WtDataReader initialized, rt dir is ../storage/rt/, hist dir is ../storage/his/, adjust_flag is 0
[12.13 10:26:47 - info ] No adjusting factor file configured, loading skipped
[12.13 10:26:47 - info ] Data manager initialized
[12.13 10:26:47 - info ] Action policies initialized
[12.13 10:26:47 - info ] Reading parser config from tdparsers.yaml...
[12.13 10:26:47 - info ] [parser1] Parser module /wtpy/wtpy/wrapper/linux/parsers/libParserUDP.so loaded
[12.13 10:26:47 - info ] [parser1] Parser initialzied, check_time: false
[12.13 10:26:47 - info ] 1 parsers loaded
[12.13 10:26:47 - info ] Reading trader config from tdtraders.yaml...
[12.13 10:26:47 - info ] [simnow] Risk control rule default of trading channel loaded
[12.13 10:26:47 - info ] [simnow] Trader module /wtpy/wtpy/wrapper/linux/traders/libTraderCTP.so loaded
[12.13 10:26:47 - info ] 1 traders loaded
[12.13 10:26:47 - info ] Executer factory WtExeFact loaded
[12.13 10:26:47 - info ] 0 executers loaded
[12.13 10:26:47 - info ] Extended parser created
[12.13 10:26:47 - info ] Extended Executer created
Connecting YourParser
[12.13 10:26:47 - info ] 2 parsers started
[12.13 10:26:47 - info ] registerFront: tcp://180.168.146.187:10201
[12.13 10:26:47 - info ] 1 trading channels started
[12.13 10:26:47 - info ] CtaTicker will drive engine with session TRADING
[12.13 10:26:47 - info ] Trading date confirmed: 20231213
[12.13 10:26:47 - debug] Main KBars confirmed: CFFEX.IF.HOT#m5
[12.13 10:26:47 - info ] Reading final bars of CFFEX.IF.HOT via extended loader...
[12.13 10:26:47 - info ] 0 items of back min5 data of CFFEX.IF.HOT cached
[12.13 10:26:47 - info ] HOT contract on 20231213 confirmed: CFFEX.IF.HOT -> IF2312
[12.13 10:26:47 - debug] His min5 bars of CFFEX.IF.HOT loaded, 0 from history, 0 from realtime
[12.13 10:26:47 - info ] Market data subscribed: CFFEX.IF.HOT
[12.13 10:26:47 - info ] EmptyStra inited
[12.13 10:26:47 - info ] Trading day 20231213 begun
press ctrl-c to exit
[12.13 10:26:47 - info ] [RiskControl] Current Balance Ratio: 100.00%
Simulated Tick: Code=CFFEX.IF.HOT, Price=3321.4201982447967, Time=20231213102647
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3478.500870923655, Time=20231213102648
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3799.4673982465883, Time=20231213102649
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3928.3792774291314, Time=20231213102650
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3734.984036522967, Time=20231213102651
Push result: None
[12.13 10:26:52 - info ] [RiskControl] Current Balance Ratio: 100.00%

I also made an empty strategy for log. It should log if receive simulated data

from wtpy import BaseCtaStrategy
from wtpy import CtaContext
import numpy as np

class EmptyStra(BaseCtaStrategy):

    def __init__(self, name:str, code:str, barCnt:int, period:str, isForStk:bool = False):
        BaseCtaStrategy.__init__(self, name)

        self.__period__ = period
        self.__bar_cnt__ = barCnt
        self.__code__ = code

        self.__is_stk__ = isForStk

    def on_init(self, context:CtaContext):
        code = self.__code__    #品种代码
        if self.__is_stk__:
            code = code + "-"   # 如果是股票代码,后面加上一个+/-,+表示后复权,-表示前复权

        context.stra_prepare_bars(code, self.__period__, self.__bar_cnt__, isMain = True)
        # 获取K线和tick数据的时候会自动订阅, 这里只需要订阅额外要检测的品种即可, 不知道对不对
        context.stra_sub_ticks(code)
        context.stra_log_text("EmptyStra inited")

        #读取存储的数据
        # self.xxx = context.user_load_data('xxx',1)

    def on_tick(self, context: CtaContext, stdCode: str, newTick: dict):
        '''
        逐笔数据进来时调用
        生产环境中,每笔行情进来就直接调用
        回测环境中,是模拟的逐笔数据

        @context    策略运行上下文
        @stdCode    合约代码
        @newTick    最新逐笔
        '''
        print('get new tick')
        print('stdCode', stdCode, 'newTick', newTick)
        pass

    def on_bar(self, context:CtaContext, stdCode:str, period:str, newBar:dict):
        '''
        K线闭合时回调

        @context    策略上下文
        @stdCode    合约代码
        @period     K线周期
        @newBar     最新闭合的K线
        '''

        print('get new bar')
        print('stdCode', stdCode, 'period', period, '\n', 'newBar', newBar)
        pass

        return

    def on_calculate(self, context:CtaContext):
        '''
        K线闭合时调用,一般作为策略的核心计算模块

        @context    策略运行上下文
        '''

        code = self.__code__    #品种代码

        trdUnit = 1
        if self.__is_stk__:
            trdUnit = 100

        #读取最近50条1分钟线(dataframe对象)
        theCode = code
        if self.__is_stk__:
            theCode = theCode + "-" # 如果是股票代码,后面加上一个+/-,+表示后复权,-表示前复权
        np_bars = context.stra_get_bars(theCode, self.__period__, self.__bar_cnt__, isMain = True)

        print('on_calculate data',np_bars.ndarray)
        # print('bars',bars.get_bar)
        short_ma = np_bars.closes[-5:].mean()
        long_ma = np_bars.closes[-60:]
        # print('short ma', short_ma, len(short_ma))
        print('short ma', short_ma)
        print('long ma', long_ma,len(long_ma))

But I didn't receive any simulated data from my parser.

linonetwo commented 7 months ago

是配置用法问题(但不确定是哪里配错了),因为 C++ 写的 parser 也收不到信息。但是 ParserCTP 用同样的接口在发消息就能收到。

ParserRandom 生成随机测试行情的代码

void ParserRandom::subscribe(const CodeSet &vecSymbols)
{
    m_running = true;
    std::thread(&ParserRandom::generateRandomData, this, vecSymbols).detach();
}

void ParserRandom::unsubscribe(const CodeSet &vecSymbols)
{
    m_running = false;
}

void ParserRandom::generateRandomData(const CodeSet &vecSymbols)
{
    std::default_random_engine generator(static_cast<long unsigned int>(time(0)));
    std::uniform_real_distribution<double> priceDist(100.0, 200.0); // Example price range
    std::uniform_int_distribution<int> volumeDist(100, 1000);               // Example volume range

    while (m_running)
    {
        for (const std::string &symbol : vecSymbols)
        {
            WTSTickData *tick = WTSTickData::create("IC2401");
            WTSTickStruct &quote = tick->getTickStruct();
            // strcpy(quote.exchg, pCommInfo->getExchg());
            strcpy(quote.exchg, "CFFEX");
            m_uTradingDate = TimeUtils::getCurDate();

            // Randomly generate the fields similar to OnRtnDepthMarketData
            quote.price = priceDist(generator);
            quote.open = priceDist(generator);
            quote.high = priceDist(generator);
            quote.low = priceDist(generator);
            quote.total_volume = volumeDist(generator);
            quote.trading_date = m_uTradingDate;
            // ... fill other fields similarly

            // Print tick data for debugging
            // std::cout << "Symbol: " << symbol << ", Price: " << quote.price << ", Volume: " << quote.total_volume << std::endl;

            if (m_sink)
                m_sink->handleQuote(tick, 1);

            tick->release();
        }

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

可能是 session 等地方配错了。

mdparsers.yaml

parsers:    # 行情通道配置
-   active: true
    broker: '8888'
    code: ''  # 要录制的合约代码,如果为空默认contracts.json中的全部,不为空则只录制指定的合约,注意这里须与contracts中的代码一致!如'CFFEX.IF2408, CFFEX.IF2403'
    front: tcp://121.37.90.193:20004  #openctp
    id: parser
    user: 1234
    pass: 123456
    module: ParserRandom

dtcfg.yaml

basefiles:  # 基础文件
    commodity: ../common/commodities.json
    contract: ../common/contracts.json
    holiday: ../common/holidays.json
    session: ../common/sessions.json
    utf-8: true
broadcaster:                    # UDP广播器配置项,如果要在局域网转发行情,可以使用该模块
    active: true
    bport: 3997                 # UDP查询端口,主要是用于查询最新的快照
    broadcast:                  # 广播配置
    -   host: 255.255.255.255   # 广播地址,255.255.255.255会向整个局域网广播,但是受限于路由器
        port: 9001              # 广播端口,接收端口要和广播端口一致
        type: 2                 # 数据类型,固定为2
shmcaster:                      # 共享内存转发,适合本机转发
    active: true
    path: ./exchange.membin     # memmap文件路径
parsers: mdparsers.yaml
statemonitor: statemonitor.yaml
writer: # 数据落地配置
    module: WtDataStorage #数据存储模块
    async: true          #同步落地还是异步落地,期货推荐同步,股票推荐异步
    groupsize: 20         #日志分组大小,主要用于控制日志输出,当订阅合约较多时,推荐1000以上,当订阅的合约数较少时,推荐100以内
    path: ../storage_AD    #数据存储的路径
    savelog: true        #是否保存tick到csv
    disabletick: false    #不保存tick数据,默认false
    disablemin1: false    #不保存min1数据,默认false
    disablemin5: false    #不保存min5数据,默认false
    disableday: false     #不保存day数据,默认false
    disablehis: false     #收盘作业不转储历史数据,默认false

commodities.json

{
   "CFFEX": {
        "IC": {
            "covermode": 0,
            "pricemode": 0,
            "category": 1,
            "precision": 1,
            "pricetick": 0.2,
            "volscale": 200,
            "name": "中证",
            "exchg": "CFFEX",
            "session": "SD0930",
            "holiday": "CHINA"
        },

sessions.json

{
    "SD0930":{
        "name":"股票白盘0930",
        "offset": 0,
        "auction":{
            "from": 929,
            "to": 930
        },
        "sections":[
            {
                "from": 930,
                "to": 1130
            },
            {
                "from": 1300,
                "to": 1500
            }
        ]
    },
    "FD0915":{
        "name":"期货白盘0915",
        "offset": 0,
        "auction":{
            "from": 929,
            "to": 930
        },
        "sections":[
            {
                "from": 930,
                "to": 1130
            },
            {
                "from": 1300,
                "to": 1515
            }
        ]
    },

运行后没有接收到行情。(我在 c++ 部分加过 log,c++ 那边是一直在循环输出行情的)

img_v3_0265_9e4cb30b-bb0b-4aae-87c3-cc9152a059cg

而同时如果使用 ParserCTP 就能正常收到 tick

截屏2023-12-15 13 28 15
Zhaoyvke commented 7 months ago

已协助一村解决。通过parserUDP转发时候可用testudp查看是否正确转发行情,目测已转发出。需要在run中改主kline合约代码。检查订阅的合约是否有行情。 b5f37ebaeb544ddf51b5b9c7c878d20ac857aa46918918c9dad5e0596f41ab

linonetwo commented 7 months ago

感谢,我昨天写了个自动产 parser 的 CI,今天让他自己改改合约名多试试…

wondertrader commented 5 months ago

推荐检查流程: 1、检查是否正确配置自定义品种的合约信息 2、使用testUDP检查是否正确转发 3、检查策略是否正确订阅tick数据