devcartel / pyrfa

Open sourced Python API for Refinitiv (Thomson Reuters) Enterprise Platform.
http://devcartel.com/pyrfa
MIT License
50 stars 15 forks source link

Strange Behavior in dispatchEventQueue() #46

Open Billy10mm opened 5 years ago

Billy10mm commented 5 years ago

dispatchEventQueue() can behave badly if I receive data but don't do anything with it. I'm not sure how else to describe this, but consider the following example code:

from pyrfa import Pyrfa
from threading import Thread
import time

class Reuters(Thread):
    def __init__(self):
        self.rfa = Pyrfa()
        self.running = True
        self.sessions = ['Session1']
        Thread.__init__(self, name='rfa')

    def connect(self):
        self.rfa.createConfigDb('reuters.cfg')
        for session in self.sessions:
            self.rfa.acquireSession(session)
        self.rfa.createOMMConsumer()
        self.rfa.login()
        self.rfa.directoryRequest()
        self.rfa.dictionaryRequest()
        self.subscribe('ESZ8')

    def run(self):
        self.connect()
        while self.running:
            data = self.rfa.dispatchEventQueue(100)

    def subscribe(self, ric):
        self.rfa.marketPriceRequest(ric)

r = Reuters()
r.start()
for x in range(0,5):
    print '{:.6f}'.format(time.time())
    time.sleep(1)
r.running = False

When I run the above on Python 2.7.15 (with optimizations enabled), the process hangs seemingly forever (it eventually does end, but it can take over 20 minutes). But if you simply add a print data immediately following the call to dispatchEventQueue(), then the software does not hang and ends in approximately 5 seconds as it should. If I change the call to dispatchEventQueue() and remove the blocking timeout as so, data = self.rfa.dispatchEventQueue(), then I have no issues at all.

wiwat-tharateeraparb commented 5 years ago

For Python 2.7, time.sleep() which has some strange implementation underneath. Your code does not hang with Python 3 however as threading in Python 3 is implemented with pure C. If you are bound to Python 2.7, dispatchEventQueue() with empty argument will not block main thread:

...
    def run(self):
        self.connect()
        while self.running:
            time.sleep(0.1)
            data = self.rfa.dispatchEventQueue()
...
wiwat-tharateeraparb commented 5 years ago

UPDATE: for Python2.7, adding Lock in both Pyrfa thread and main thread will also help.

from pyrfa import Pyrfa
from threading import Lock, Thread
import time
lock = Lock()

class Reuters(Thread):
    def __init__(self):
        self.rfa = Pyrfa()
        self.running = True
        self.sessions = ['Session1']
        Thread.__init__(self, name='rfa')

    def connect(self):
        self.rfa.createConfigDb('pyrfa.cfg')
        for session in self.sessions:
            self.rfa.acquireSession(session)
        self.rfa.createOMMConsumer()
        self.rfa.login()
        self.rfa.directoryRequest()
        self.rfa.dictionaryRequest()
        self.subscribe('EUR=')

    def run(self):
        self.connect()
        while self.running:
            lock.acquire()
            data = self.rfa.dispatchEventQueue(100)
            lock.release()

    def subscribe(self, ric):
        self.rfa.marketPriceRequest(ric)

r = Reuters()
r.start()
for x in range(0,5):
    lock.acquire()
    print('{:.6f}'.format(time.time()))
    time.sleep(1)
    lock.release()
r.running = False