Open sungeunbae opened 1 week ago
def populate(self, realisations, fault_selection: Dict[str, int]):
"""Initial population of the database with all realisations"""
realisations.extend(fault_selection.keys())
realisations.extend(
[
simulation_structure.get_realisation_name(event, i)
for event, rel_count in fault_selection.items()
for i in range(1, rel_count + 1)
]
)
if len(realisations) == 0:
print("No realisations found - no entries inserted into db")
else:
with connect_db_ctx(self._db_file) as cur:
procs_to_be_done = cur.execute(
"""select * from proc_type_enum"""
).fetchall()
for run_name in realisations:
for proc in procs_to_be_done:
with connect_db_ctx(self._db_file) as cur:
if not self._does_task_exists(cur, run_name, proc[0]):
self._insert_task(cur, run_name, proc[0])
I feel it was a bit too much to close the db and commit every single realisation (with context is exited each time). It was introduced to fix the DB locked issue.
@sungeunbae I can't remember if you mentioned whether you solved this issue last week, but I do have a suggestion. As I understand it the big problem is that you have a number of long-running parallel tasks that need to eventually write to the DB to say "hey I'm done!". Currently we have them all call the populate
function to write this data on different threads. You can't have the transaction stay open for every thread or else it'll lock the others out. Now you learn you can't close the transaction because then it slows everything down. Have you considered introducing a thread-safe worker queue system like celery? Then you would have one worker whose job is to consume write requests and access the DB in the most efficient way (with one transaction for all the writes). The threads doing the processing can then talk to the queue in whatever way they see fit. The tricky concurrent write queue logic is taken care of by the celery package.
Also, that populate
method seems a little funky from a query efficiency standpoint. The _does_task_exist
and _insert_task
methods presumably each query the DB, but you can easily combine those into one SQLITE call with
INSERT OR IGNORE INTO state ... VALUES ...
I'm not sure if the level of engineering my solution proposes is worth it considering the upcoming pegasus workflow changes.
Lastly, and perhaps the reason why is obvious, is there any reason why this is not being done with asyncio
? The problems you have inherently spawn from having multiple threads, but you don't need multiple threads because all the tasks are farmed out to external processes. As far as the workflow is concerned, this is an IO bound problem not a CPU bound problem. If celery is too much, maybe try swapping out threads for asyncio and then free yourself from multithreaded writes.
OK, list_crustal.txt has 478 faults, quite more than we usually have. But, this is not something that should take more than a few minutes. It has been 3 hours, and is still running.
Started at around 5:40 GMT,
....