radiocosmology / alpenhorn

Alpenhorn is a service for managing an archive of scientific data.
MIT License
2 stars 1 forks source link

Rewrite 12/14: Auto Import #155

Closed ketiltrout closed 1 year ago

ketiltrout commented 1 year ago

This PR updates auto_import.py to tie it into the task and I/O framework.

Changes to previous rewite PRs

I've made changes to two methods of StorageNode introduced in #147:

Both of these changes are needed to prevent auto_import from trying to import pre-existing files with has_file "X" or "M", both of which indicate that alpenhornd already know about this file and so an auto-import is not needed.

Task-ification of _import_file

The function import_file (which is called by the file event handler that gets triggered by the filesystem observer) now makes use of the task queue: the inner function, _import_file is now a task inner loop and the outer function import_file now submits the inner function to the task queue for asynchronous execution (after doing some easy early checks).

The benefit here is that the import doesn't run in the observer thread, meaning it can spend its time just observing. The primary downside to doing it this way, is the inner function now has to be okay with being passed a file path that has already been imported.

(Also, although the Nearline stuff isn't around yet, the _import_file task now has the necessary capability (a call to ready_path along with some yielding) to recall a file from tape if necessary before importing it.)

Change to ArchiveAcq creation

The ArchiveAcq creation has been pushed down to after filetype detection, to a point where we're certain we want to import the path we've been given. Before this rewrite, the acq would be created in the database as early as possible (as soon as AcqType.detect succeeded), meaning stray acqs could be made if AcqType.detect worked but FileType.detect failed. The entire DB update now occurs in a single transaction, ensuring DB consistency.

Observers and I/O classes

Separate to import_file changes, I've made changes to the way the observer threads. Each I/O class can specify which observer to use for auto importing. The DefaultIO observer is just the system-default observer (watchdog.observers.Observer), which in Linux will be the InotifyObserver. Because that doesn't work for NFS mounts, I've created a separate I/O class (alpenhorn.io.Polling.py) which is identical to the DefaultIO class except that it explicitly uses the PollingObserver for auto import.

Instead of one observer thread per auto-imported node, there's now only one per I/O class. The nodes just add new watchers to an existing observer if one already exists for the I/O class. This effectively reduces the number of threads in alpenhornd, though I don't think it's going to have a huge effect, practically.

An I/O class may have no observer specified (i.e. set node.io.observer to None), in which case auto-import is not possible for nodes with that I/O class.

Changes to observer setup (and tear-down)

I've renamed setup_observer to update_observers and enhanced it. It's now only passed a single node, not all of them, and will both

meaning observation can be updated on-the-fly now. Closes #15

The setup of the observers is removed from alpenhorn start-up in service.py and added the node-update loop in update.py where the call to update_observers is made once per update loop.

The call to the catchup function (which scans a node to find files that aren't in the database) is now called from update_observers whenever that function has to start watching a new node. Like update_observers, it now only accepts a single node to run on, rather than the whole list.

These changes close #15

Miscellaneous code clean-up

ljgray commented 1 year ago

Approved with a couple of super minor comments