devcartel / pyrfa

Open sourced Python API for Refinitiv (Thomson Reuters) Enterprise Platform.
http://devcartel.com/pyrfa
MIT License
50 stars 15 forks source link

Sometimes, why the session halts? #41

Open hzadonis opened 6 years ago

hzadonis commented 6 years ago

Hi, Masters: These days, I met a issue. Sometimes, the pyrfa session halts to receive data. But this is not always happens, yesterday it worked smoothly without problem. But today, it had been halt 2 times. At the same time, I check data at TREP with the same RIC, the data is always refreshes. Do you have any suggestion? Thanks!

Regards Zheng

wiwat-tharateeraparb commented 6 years ago

Hi, make sure you call dispatchEventQueue() fast enough.

hzadonis commented 6 years ago

dispatchEventQueue(100) Does it mean the interval is 0.1 second? Is it fast enough? Actually, I subscribed 10 RICs, and every one is a thread. If it passes test, I'll ask more than 3000 RICs.

wiwat-tharateeraparb commented 6 years ago

100 means it will block for 100ms and if there is data it will dispatch from queue if no data during 100ms it will then return.

PyRFA is thread-safe but not sure what happned as I don’t see your program. Anything in the log file?

hzadonis commented 6 years ago

import pyrfa import logging import logging.handlers import datetime import numpy import threading import sys import datetime import time import csv

from qpython import qconnection from qpython.qcollection import qlist from qpython.qtype import QException, QTIMESTAMP_LIST, QTIMESPAN_LIST, QSYMBOL_LIST, QFLOAT_LIST, QINT_LIST

class PublisherThread(threading.Thread): #PublisherThread类为从threading继承的子类

def __init__(self, pubtrdq, pubtrdp, subric):
    super(PublisherThread, self).__init__()
    self.q = pubtrdq
    self.p = pubtrdp
    self.ric = subric
    self._stopper = threading.Event()
    # 在这里还可以定义类中其他的属性,比如self.name、self.price等,也是全局变量可供类中的其他方法直接使用。

def stop(self):
    self._stopper.set()

def stopped(self):
    return self._stopper.isSet()

def run(self):
    while not self.stopped():
        try:
            # publish data to TickerPlant, 将数据发往TickerPlant
            print(self.ric + ' subscribing to TREP...')
            self.get_marketPrice_from_trep()    # 通过该方法获取实时成交的交易数据,历史原因不要被marketPrice这个名字骗了哦!
            #self.get_marketByOrder_from_trep()  # 通过该方法获取实时报价数据。
        except QException as e:
            print('>*< excepted')
            print(e)
            logger.info(e)
        except:
            self.stop()

def get_marketPrice_from_trep(self):
    # 定义所需查看的字段:
    self.p.setView('DSPLY_NAME, PRCTCK_1, TRADE_DATE, SALTIM_MS, TRDPRC_1, TRDVOL_1')
    # 若要查看所有的字段:
    # self.p.setView()
    self.p.marketPriceRequest(self.ric) # 所订阅的RIC以参数的形式传入
    end = False
    while not end:
        try:
            updates = self.p.dispatchEventQueue(100)
        except KeyboardInterrupt:
            end = True
        if updates:
            print("")
            for u in updates:
                ric = [u['RIC']]
                if u['MTYPE'] == 'REFRESH':
                    # 测试过程中发现,如果MTYPE是REFRESH时,相应信息只有ServiceName和RIC字段,除此之外并无其他有用的数据,故而不需要Insert到kdb+中
                    update_able = False
                else: # 当MTYPE不为REFRESH时,才需要Insert到kdb+中
                    update_able = True
                for k, v in u.items():  # 可以将k理解为FieldName,v为其值
                    fid = self.p.getFieldID(k)  # 通过FieldName获取其FID
                    if fid == 14:   # PRCTCK_1域,可以用该域来判断买卖方向
                        if v == '⇩':
                            direction = ['SELL']
                        elif v == '⇧':
                            direction = ['BUY']
                        else:
                            direction = ['BUY']
                    elif fid == 16:      # TRADE_DATE域,该域为字符串格式,str类型,例如:'31 MAY 2018'。要将其转换为时间戳格式
                        quotedate = datetime.datetime.fromtimestamp(int(time.mktime(time.strptime(v, "%d %b %Y")))).strftime("%Y-%m-%d")
                    elif fid == 6:    # TRDPRC_1域
                        price = [numpy.float32(v)]
                    elif fid == 178:  # TRDVOL_1域
                        volume = [numpy.int(v)]
                    elif fid == 3854:    # SALTIM_MS域,该域为时间戳格式,int类型,例如:20526000
                        quotetime = datetime.datetime.utcfromtimestamp(v / 1000).strftime("%H:%M:%S.%f")    # 将时间戳格式的QUOTIM_MS值转为GMT时间
                    else:
                        pass

                    '''
                    # 不需要时可以屏蔽以下日志输出段:
                    if update_able:
                        if type(v) is float:  # 如果v的值为float类型
                            print("%20s %g" % (k + ' (' + str(fid) + ')', v))  # %20s:将k格式化为20位字符长度;%g:浮点数字(根据值的大小采用%e或%f)
                            logger.info("%20s %g" % (k + ' (' + str(fid) + ')', v))  # 将k, v值写到日志中
                        else:  # 否则:
                            print("%20s %s" % (k + ' (' + str(fid) + ')', v))  # %20s:将k格式化为20位字符长度;%s:字符串
                            logger.info("%20s %s" % (k + ' (' + str(fid) + ')', v))  # 将k, v值写到日志中
                    else:
                        pass
                    '''

                if update_able:
                    quotestamp = [numpy.datetime64(quotedate + 'T' + quotetime, 'ns')]
                    pkgdata = [qlist(quotestamp, qtype=QTIMESTAMP_LIST), qlist(ric, qtype=QSYMBOL_LIST),
                               qlist(direction, qtype=QSYMBOL_LIST),
                               qlist(price, qtype=QFLOAT_LIST), qlist(volume, qtype=QINT_LIST)]
                    print(pkgdata)
                    self.q.sync('updtrade', numpy.string_('trade'), pkgdata)   # 将pkgdata插入到trade表中
                    # insert数据库后马上清空price, volume的值,避免后续数据在没有更新的情况下与RIC结合产生紊乱的情况。
                    price = [numpy.float32()]
                    volume = [numpy.int()]
                else:
                    pass
                    # logger.info('There is REFRESH tag, maybe no active realtime data.')

if name == 'main': global logger # 将logger定义为全局变量,便于在整个程序架构内进行数据操作写日志时调用 LOG_FILE = '../log/RTFeed.log'

# handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1024*1024, backupCount=5, encoding='utf-8') # 实例化handler,考虑到特殊字符需要设置encoding为utf-8
handler = logging.handlers.RotatingFileHandler(LOG_FILE, encoding='utf-8')  # 实例化handler,考虑到特殊字符需要设置encoding为utf-8
fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'

formatter = logging.Formatter(fmt)  # 实例化formatter
handler.setFormatter(formatter)  # 为handler添加formatter

logger = logging.getLogger('RTFeed')  # 获取名为RTFeed的logger
logger.addHandler(handler)  # 为logger添加handler
logger.setLevel(logging.INFO)

logger.info('RTFeed is about to run and write log file.')

try:
    # PyRFA连接TREP:
    p = pyrfa.Pyrfa()
    p.createConfigDb("../control/RTFeed.cfg")  # 指定配置文件
    p.acquireSession("Session1")  # 指定读配置文件哪个Session节的配置
    p.createOMMConsumer()
    p.login()
    p.directoryRequest()
    p.dictionaryRequest()

    # 连接TP,在owntick示例中TP运行在本机的8099端口上
    with qconnection.QConnection(host='localhost', port=8099) as q:
        print('^oo^ is running...')
        #print(q)   # 将打印“:localhost:8099”
        print('IPC Version: %s. Is connected: %s' % (q.protocol_version, q.is_connected()))
        print('Press <ENTER> to close application')
        print('TickerPlant Server port 8099 connected.')
        logger.info('TickerPlant Server port 8099 connected.')

        # 读取所需订阅数据的RIC列表然后将其作为所调用线程的参数:
        csv_file = csv.reader(open('../control/AShareList.csv', 'r'))
        for ric in csv_file:
            t = PublisherThread(q, p, ric[0])   # 参数说明:q为qconnection对象、p为pyrfa对象、ric[0]为RIClist.csv文件中定义的各RIC名。
            t.start()

        sys.stdin.readline()

        t.stop()
        t.join()
except Exception:
    logger.info('TickerPlant Server port 8099 NOT connected! Exit RTFeed.')
finally:
    # Terminate the connection to TP:
    q.close()
    # Close subscribe to TREP:
    p.marketPriceCloseAllRequest()
    p.marketByOrderCloseAllRequest()
    p.marketByPriceCloseAllRequest()
hzadonis commented 6 years ago

Once the problem occurred, then the process seems totally halt, no output logs.

wiwat-tharateeraparb commented 6 years ago

It may have nothing to do with dispatchEventQueue after all. Can you try/catch get_marketPrice_from_trep() to see if there is any error parsing the incoming updates?

wiwat-tharateeraparb commented 6 years ago

Or try dispatchEventQueue() with empty timeout. This will not block but return immediately.

hzadonis commented 6 years ago

Thanks, Let me try that.

hzadonis commented 6 years ago

I ran the process with dispatchEventQueue() for 2 days, but no halt issue happens.