Closed cjw123ztt closed 3 years ago
模拟行情sf 是否可以用来接收vn.py的btc行情,这样可以减少开发成本 vnpy-master\vnpy\gateway\okex https://github.com/vnpy/vnpy https://bihu.com/article/1231419010 如何用vnpy下载全市场数据(期货CTP接口) https://zhuanlan.zhihu.com/p/31291510 | item | 行情 | comments |
---|---|---|---|
sf | vn.py btc | rpc on zmq | |
sf | vn.py bar | vn.py and kungfu都占用太大内存和硬盘 | |
sf | kungfu bar | 可能需要自己实现一个基于python的bts | |
sf | 掘金量化50只个股 |
bool sf_source::setUDPSockect(const char * pBroadcastIP, int nBroadcastPort) { if (m_zmq_receiver == nullptr) return false; //to do ...ip+port ////zmq_publisher_rep=tcp://192.168.1.26:6665 char buffer[256]; memset(buffer, 0, sizeof(buffer)); sprintf(buffer, "%s:%d", pBroadcastIP, nBroadcastPort); return m_zmq_receiver->init(buffer); }
lass FeedMsg { public:
FeedMsg(const FeedMsg&); FeedMsg& operator=(const FeedMsg&); FeedMsg() : Code(), MarketTime(), Last(0), DailyVolume(0), TurnOver(0), OpenInterest(0), Close(0), Settlement(0), UpperLimit(0), LowerLimit(0), ServiceDay() { }
virtual ~FeedMsg() throw();
std::string Code;
std::string MarketTime;
double Last;
std::vector
item | type | comments |
---|---|---|
code | ||
tradingday | ||
marktime | 0 | atr,bar |
lastprice | 0 | atr,bar |
exchange | ||
TurnOver | 0 | avg price |
DailyVolume | 0 | avg price |
open | 0 | pankou |
preclose | 0 | pankou |
对于通讯的问题,vnpy.rpc使用ZMQ作为底层通讯库 C:\Users\dfc\Downloads\vnpy-master\examples\client_server\server http://www.uquant.org/group/1/thread/13 https://blog.csdn.net/ocean1171597779/article/details/88659099
C:\Users\dfc\Downloads\vnpy-master\examples\client_server\server def _init_strategy(self, strategy_name: str): """ Init strategies in queue. """ strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
self.write_log(f"{strategy_name}开始执行初始化")
# Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables)
data = self.strategy_data.get(strategy_name, None)
if data:
for name in strategy.variables:
value = data.get(name, None)
if value:
setattr(strategy, name, value)
# Subscribe market data
contract = self.main_engine.get_contract(strategy.vt_symbol)
if contract:
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name)
else:
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
# Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
C:\Users\dfc\Downloads\vnpy-master\docs\script_trader.md
subscribe()函数用于订阅行情信息,若需要订阅一篮子合约的行情,可以使用列表格式。
engine.subscribe(vt_symbols = ["rb1909.SHFE","rb1910.SHFE"])
def subscribe(self, vt_symbols):
""""""
for vt_symbol in vt_symbols:
contract = self.main_engine.get_contract(vt_symbol)
if contract:
req = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
C:\Users\dfc\Downloads\vnpy-master\docs\data_recoder.md 此时行情记录模块的启动状态为True,会启动while循环,可以添加任务实现实时行情记录。
def start(self):
""""""
self.active = True
self.thread.start()
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
except Empty:
continue
def put_event(self):
""""""
tick_symbols = list(self.tick_recordings.keys())
tick_symbols.sort()
bar_symbols = list(self.bar_recordings.keys())
bar_symbols.sort()
data = {
"tick": tick_symbols,
"bar": bar_symbols
}
event = Event(
EVENT_RECORDER_UPDATE,
data
)
self.event_engine.put(event)
参考py | comments |
---|---|
run_server.py | example\client_server |
app\rpc_service\engine.cpp |
def register_event(self): """""" self.event_engine.register(EVENT_TICK, self.process_tick_event) self.event_engine.register(EVENT_CONTRACT, self.process_contract_event) self.event_engine.register( EVENT_SPREAD_DATA, self.process_spread_event)
def update_tick(self, tick: TickData):
""""""
if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick)
if tick.vt_symbol in self.bar_recordings:
bg = self.get_bar_generator(tick.vt_symbol)
bg.update_tick(tick)
def process_tick_event(self, event: Event):
""""""
tick = event.data
self.update_tick(tick)
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract)
1.CtpMdApi::onRtnDepthMarketData=>self.gateway.on_tick(tick) 2.BaseGateway::on_tick-->self.on_event(EVENT_TICK, tick) 3.OmsEngine::register_event 4.MainEngine::get_tick call the =>OmsEngine::get_tick
按照vn.py的设计理念,有了功能框架,自然就必须有解决实际交易需求的应用。v2.0.5版本中提供了一套包含客户端和服务端的RPC应用,包括:
RpcServiceApp:将VN Trader进程转化为RPC服务器,对外提供交易路由、行情数据推送、持仓资金查询等功能 RpcGateway:将RPC服务器视作类似于CTP的服务端系统,通过标准Gateway来连接并进行交易,对上层应用完全透明 https://zhuanlan.zhihu.com/p/72378989