BritishGeologicalSurvey / etlhelper

ETL Helper is a Python ETL library to simplify data transfer into and out of databases.
https://britishgeologicalsurvey.github.io/etlhelper/
GNU Lesser General Public License v3.0
104 stars 25 forks source link

Add abort event for threaded operation #124

Closed volcan01010 closed 1 year ago

volcan01010 commented 2 years ago

Summary

As an ETLHelper user I want a way to abort ETL processes when they are running in threads so that I can use ETLHelper within GUI applications.

Details

We recently had a case where an ETL Helper script was used in a Qt-based GUI application. The ETL work was done in a separate thread by a worker to prevent it freezing the GUI. However, this meant that there was no easy way to abort the running job. For a normal script you can just use CTRL-C.

Abort Event setup

Using a threading Event could allow users to send a signal to abort a running job. It could work as follows:

# etlhelper/abort.py
import threading

from etlhelper.exceptions import EtlHelperAbort

abort_event = threading.Event()

def abort():
    """Set an abort event."""
    abort_event.set()

def raise_for_abort(message):
    """Raise EtlHelperAbort exception with message if abort_event is set."""
    if abort_event.is_set():
        raise EtlHelperAbort(message)

Implementation

In etl.py, we just need to import the raise_for_abort method and call it at the start of the loops in iter_chunks and executemany. In the GUI application, it would be necessary to from etlhelper.abort import abort and to call abort() where you want the script to abort.

In iter_chunks, the row factory function could be modified to replace this: https://github.com/BritishGeologicalSurvey/etlhelper/blob/6cd64fd0dfbf5a40d80437992fae8b69fcbbe05e/etlhelper/etl.py#L66

with:

create_row = row_factory(cursor)
def create_row_or_abort(row):
    """Check for abort event before creating row."""
    raise_for_abort()
    return create_row(row)

Then replace create_row further down with create_row_or_abort.

For executemany, the check could happen when rows are read from the chunker by replacing this: https://github.com/BritishGeologicalSurvey/etlhelper/blob/6cd64fd0dfbf5a40d80437992fae8b69fcbbe05e/etlhelper/etl.py#L288

with:

chunk = []
for row in dirty_chunk:
    raise_for_abort()
    if row is not None:
        chunk.append(row)

after renaming the raw output from the chunker as dirty_chunk.

Things to consider

Acceptance criteria

References

Note: these could make a nice recipe one day, based on BGS' Sigmalite importer code.

volcan01010 commented 2 years ago

Note that the implementation described above would create some kind of universal abort that would cancel all ETLHelper jobs across all threads. There may be situations where someone could be running multiple jobs across multiple threads and may want to abort just one of them.

I think that scenario is out-of-scope here - it is better to implement something simple that solves our immediate problem first. This implementation can act as a template for more complicated scenarios. We could create a recipe where the user defines their own transform that includes a raise_for_abort() that checks a custom Event that is dedicated to their thread.

volcan01010 commented 2 years ago

I'm not convinced of the changes to the executemany described above as I think the bottleneck is the cursor.executemany command. The data loading doesn't use generators in the same way as iter_chunks. It is probably only worth calling raise_for_abort() at the start of each chunk, and for a more responsive abort, users would just have to choose a smaller chunk_size.

volcan01010 commented 2 years ago

You'd also need to reset the abort flag at the start of the run e.g.

# in etlhelper/abort.py

def clear_abort():
    """Clear abort_event."""
    abort_event.clear()