Open prjemian opened 9 months ago
Not sure that it does now. I'd apply the run_in_thread()
function from apstools.utils to change this. Copy that function rather than require the entire package.
[9:45 AM] Wolfman, Mark One thing that might get a little weird with run_in_thread is that the tiled_client has to be used in the same thread as it's created (if you want to use the caching feature at least).
The vanilla qt solution is to create a worker in a QThread and have that handle all the blocking I/O stuff. The downside is that they can only communicate via signals/slots. In Firefly's run_browser, this is making for a lot of spaghettification.
Alternate idea is to make an asynchronous wrapper around the tiled client and then using qasync to do the work. Then you have to deal with async/await issues though.
Not sure which approach is better. Maybe QThread but my spaghetti code means I need a refactor.
(While you were messaging, I was typing this.)
So gemviz would need a locking mechanism to prevent repeated tiled server requests until the first one finishes? Also guard against accessing an incomplete in-memory cache of tiled run data?
The issues I ran into with run_in_thread
is that when using the tiled Cache(), during __init__
it saves the thread ID, the when writing to the cache (presumably in run_in_thread
), it makes sure it's in the same thread since sqlite isn't thread safe.
So gemviz would need a locking mechanism to prevent repeated tiled server requests until the first one finishes? Also guard against accessing an incomplete in-memory cache of tiled run data?
Firefly disables the widgets that could result in new tiled requests until the current one is done. This is kind of annoying though, so I'm planning to refactor so that new tiled requests will cancel the previous ones. Otherwise it's very slow selecting which runs you want to plot since you have to wait for the request to complete each time you pick one.
But yeah, some sort of synchronization.
I'm wondering (and believe it would help) if a refactor to use signals and slots would be the best approach here. Signals/slots is already in use throughout the GUI handling.
I'm wondering (and believe it would help) if a refactor to use signals and slots would be the best approach here. Signals/slots is already in use throughout the GUI handling.
Yes, but I think anything that remotely touches the tiled API should be behind a signal/slot, which is what I'm trying to fix in Firefly. Here is an example of how I get the list of distinct values for putting in comboboxes for filtering the list of results. With QThread and signals/slots, it looks like:
class DatabaseWorker(QObject):
_client = None
# Another signal just for calling functions
distinct_fields_changed = Signal(list)
...
@property
def client(self):
# Need to make sure the client is created in the same thread
if self._client is None:
self._client = tiled.client.from_uri(...)
return self._client
def load_distinct_fields(self):
"""Use the tiled_client to get a list of distinct fields."""
fields = [
"sample_name",
"proposal_number",
...
]
response = self.client.distinct(*fields)
...
self.distinct_fields_changed(new_fields)
class RunBrowser(MainWindow):
# Signals just needed for communicating with the DB worker
load_distinct_fields = Signal()
...
def __init__(self):
# Load the DB worker
self._thread = QThread(parent=self)
self._db_worker = DatabaseWorker(root_node=root_node)
self._db_worker.moveToThread(thread)
self._thread.start()
# Make the signal-slot connections
self.load_distinct_fields.connect(worker.load_distinct_fields)
worker.distinct_fields_changed.connect(self.update_combobox_items)
self.load_distinct_fields.emit()
def update_combobox_items(self, fields):
"""Update the filter comboboxes with a list of possible values from the DB."""
for field_name, cb in [
("proposal_users", self.ui.filter_proposal_combobox),
("proposal_id", self.ui.filter_user_combobox),
("esaf_id", self.ui.filter_esaf_combobox),
("sample_name", self.ui.filter_sample_combobox),
("plan_name", self.ui.filter_plan_combobox),
("edge", self.ui.filter_edge_combobox),
]:
if field_name in fields.keys():
old_text = cb.currentText()
cb.clear()
cb.addItems(fields[field_name])
cb.setCurrentText(old_text)
With async/await, it could be simpler:
class DatabaseWorker():
...
def __init__(self):
# No need for threading safety since there's no threading
self.client = tiled.client.from_uri(...)
async def load_distinct_fields(self):
"""Use the tiled_client to get a list of distinct fields."""
fields = [
"sample_name",
"proposal_number",
...
]
response = await self.client.distinct(*fields)
...
return new_fields
class RunBrowser(MainWindow):
...
async def __init__(self):
# Load the DB worker
self._db_worker = DatabaseWorker(root_node=root_node)
# Make the signal-slot connections
fields = await worker.load_distinct_fields()
self.update_combobox_items(fields)
def update_combobox_items(self, fields):
"""Update the filter comboboxes with a list of possible values from the DB."""
for field_name, cb in [
("proposal_users", self.ui.filter_proposal_combobox),
("proposal_id", self.ui.filter_user_combobox),
("esaf_id", self.ui.filter_esaf_combobox),
("sample_name", self.ui.filter_sample_combobox),
("plan_name", self.ui.filter_plan_combobox),
("edge", self.ui.filter_edge_combobox),
]:
if field_name in fields.keys():
old_text = cb.currentText()
cb.clear()
cb.addItems(fields[field_name])
cb.setCurrentText(old_text)
Using signals-slots to call functions in a thread means that for every function you need 2 signals (one for calling arguments and 1 for the return value), plus it decouples the place where the function is called from the place where the return value is used.
I think signals-slots are great for responding to UI events, but not so great for communication between threads.
Just thinking out loud at this point. In the example above, we still need a way to make the tiled client awaitable, so threading still becomes an issue but at least it could all be wrapped in one place with something like qasync.
An update on how I'm doing this in the haven run browser for reference.
I have made a basic asynchronous tiled client and set up the QApplication to use qasync: https://github.com/spc-group/haven/commit/41caf20323348c4d869a864234725b3f065f1117
When using qasync, there's some changes that are needed to the launcher for the application, but otherwise the rest of the qt code is unchanged. The big new features is the asyncSlot decorator. This way you can await on I/O-bound operations using the same event loop as Qt does, so there's no blocking anymore.
@asyncSlot()
async def load_data(...):
...
Right now I'm going through and making async versions of the slots that need to talk to tiled. So far it's been a very satisfying experience of getting rid of sections of spaghetti code. I'll update again once that's done.
Well I got it done. The tiled operations are all done in coroutines using qasync so the GUI doesn't block. I got rid of a lot of signals and slots, which reduced the complexity of the project quite a bit and makes it easier to debug.
It was a lot of work, mostly because I had already set it up to use QThread, so I had to re-factor it back into a simpler structure.
I put a toy example of how it works into my run browser. self.db_task(...)
runs to co-routine as a task so it can be cancelled if the button is clicked again. @cancellable
catches the subsequent CancelledError exception and prints a warning instead.
async
and await
sprinkled in.Task
functionality to cancel partial database operations if a new one is started.Writing tests was a chore. I think this was due to a couple of reason. 1) I had to make sure there were running tasks after the tests ended otherwise it got mad. 2) The Bluesky run engine also uses an event loop so qasync and bluesky were fighting at first. Testing with QThread was also difficult, so I'm not sure one is better than the other here. 3) pytest-qt and qasync both have ideas about how to create a QApplication. I'm using a custom subclass, so I had to make them both use my subclass rather than the default QApplication, which was tricky.
I'm done with my immediate need. If you ever questions about either approach (QThread vs qasync) for gemviz, let me know.
The tapi module has the interface to tiled here. Do you have any suggestions for it?
https://github.com/BCDA-APS/gemviz/blob/7e7254f16dafca7a78344417c71d7961f7af8cd7/gemviz/tapi.py#L2
The tapi module has the interface to tiled here. Do you have any suggestions for it?
https://github.com/BCDA-APS/gemviz/blob/7e7254f16dafca7a78344417c71d7961f7af8cd7/gemviz/tapi.py#L2
I can think of a couple of options, I think it's a trade-off between convenience versus effort to re-write it.
The simplest is to use a decorator that will make the top-level synchronous function in tapi.py awaitable. No need to re-write the code, so get_run()
becomes:
@awaitable
def get_run(uri=None, catalog="training", reference=None):
... # Function definition doesn't change.
Here's a jupyter notebook with the decorator: https://gist.github.com/canismarko/c02165ef10e9caa5717cddbf38c586e7
The downside is that you can't nest async functions with this approach. You couldn't use this decorator with both get_run()
and get_tiled_runs()
because get_run()
calls get_tiled_runs()
and the inner function of get_run()
is still synchronous and could not call the now-asynchronous get_tiled_runs()
.
You could also use the same decorator but give the decorated function a new name:
def get_run(uri=None, catalog="training", reference=None):
... # This function doesn't change
aio_get_run = awaitable(get_run)
Then you can call aio_get_run()
from the asynchronous code and get_run()
from other synchronous code. Not too bad, but you have to keep track of which one to use where.
This is what I did in my run browser. The idea is make something similar to the tiled client but asyncronous, then the top-level functions in tapi.py are all just async. If/when an async tiled client is released it's an easy swap.
class AioClient():
def __init__(self, node):
self._node = node
await def __get__item(self, key):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, self._node.__getitem__, key)
return AioClient(result)
async def connect_tiled_server(uri):
"""Make asynchronous connection with the tiled server URI. Return a client object."""
from tiled.client import from_uri
# leave out "dask" and get numpy by default
# https://examples.dask.org/array.html
# > Call .compute() when you want your result as a NumPy array.
loop = asyncio.get_running_loop()
client = await loop.run_in_executor(None, from_uri, uri, "dask")
return AioClient(client)
async def get_run(uri=None, catalog="training", reference=None):
"""Get referenced run from tiled server catalog."""
# from gemviz.tapi import connect_tiled_server
# from gemviz.tapi import get_tiled_runs
uri = uri or "http://localhost:8020"
client = await connect_tiled_server(uri)
...
run = await client[uid]
I mostly did this because I also want to use an async client in other places too, and it will be easy to swap if/when an async tiled client is available. You need to refactor the module to work this way, though, and implement async versions of all the tiled client features that you need. Probably not the best choice for gemviz.
Thanks! I wonder if the decorator could implement a lock it can check to prevent recursion.
Thanks! I wonder if the decorator could implement a lock it can check to prevent recursion.
Probably, though maybe that'd be better as its own decorator? Seems unrelated to making synchronous code awaitable.
@canismarko: Thanks for the question