Open mhasself opened 6 years ago
So I tried to be aware of this when writing the new radiometer DataNodeServer
, however I haven't been able to get it to work. Does function you want to block need to be called from the reactor thread? I guess I don't understand how to implement it when using the ApplicationRunner, as is done in the DataNodeServer
, at least not by simply looking at the example in the link you provided. Do you use it in this context in OCS at all?
The relevant section of the link is "Getting Results". Also, it's not that we want the function to block, it's that the function is going to block on an i/o call, so it cannot be run in the main reactor thread.
This is done in ocs in OCSAgent.start, for example, where the Process / Task start function is launched using "deferToThread". This is necessary in many (but not all) Agents, because hardware control stuff often involves blocking i/o.
Right, so we actually want to use deferToThread
, rather than blockingCallFromThread
, since we want the results from open
and readlines
back in the reactor thread?
I'll take a look at OCSAgent.start
, thanks for pointing to it.
Oops. Yes. Sorry. I've adjusted my opening comment. You can either wrap each "open" or "readlines" call with a deferToThread, or put both those calls into a function and call the function with deferToThread.
Alright, I maybe implemented this in commit 7323c50 (not totally clear to me, since it does still work when implemented improperly). The relevant lines:
@inlineCallbacks
def _get_data_blocking(start, end, max_points):
file_list = _build_file_list(start, end)
print('Reading data from disk from {start} to {end}.'.format(start=start, end=end))
data = yield _read_data_from_disk(file_list, max_points=max_points)
returnValue(data)
Then in get_data
I call it with:
data = _get_data_blocking(start, end, self.max_points)
I'm wondering if yield also needs to be used on the _build_file_list
call. Really, should I wrap _build_file_list
and _read_data_from_disk
into a single function and then use that function like I use _read_data_from_disk
here, with the yield
and returnValue
in an @inlineCallbacks
decorated function?
Part of why I didn't use yield
when building the file list is because _read_data_from_disk
can't handle getting a generator passed to it.
Just wanted to link to the example that illustrates these concepts in the OCS directory: https://github.com/simonsobs/ocs/commit/a9eac5782aeb3ec32dc9e6bf8b425113111ec909
4c2921f20290c52e1843340499d572b9e5c64785 implements this properly. Still needs to be fixed in the example DataNodeServers
though.
See server_example_weather.py, for example. apex_weather.get_data() uses "open" and "readlines". These are not asynchronous functions, and thus thus function must be run in a side-thread. Relevant twisted docs:
https://twisted.readthedocs.io/en/twisted-18.4.0/core/howto/threading.html
The solution will involve reactor.blockingCallFromThread [edit: I meant reactor.deferToThread!]. A cute way to accomplish this would be to allow subclasses of DataNodeServer that need to do blocking I/O to define a function "get_data_blocking" instead of get_data. Then have the default implementation of DataNodeServer check for that function and call it with the appropriate wrapper.
(In OCS development we've ended up with the opposite problem: too many instances of things running in side-threads instead of the reactor thread!)