dandi / dandisets

730 Dandisets, 807.1 TB total. DataLad super-dataset of all Dandisets from https://github.com/dandisets
10 stars 0 forks source link

problem with locking on git rm in 000026 #342

Closed yarikoptic closed 1 year ago

yarikoptic commented 1 year ago

We had similar issue before (https://github.com/dandi/dandisets/issues/290) which was addressed by #292 invoking .repo.precommit to stop all background batch processes before invoking git rm . In the past day we had similar but on different files crash. Here is output from the run which I did manually after hard resetting and git cleaning 000026

(base) dandi@drogon:/mnt/backup/dandi/dandisets$ flock -E 0 -e -n /home/dandi/.run/backup2datalad-cron.lock bash -c '/mnt/backup/dandi/dandisets/tools/backups2datalad-update-cron'
2023-04-06T18:53:07-0400 [WARNING ] dandi: A newer version (0.52.0) of dandi/dandi-cli is available. You are using 0.48.1
add dandiset.yaml (non-large file; adding content to git repository) ok
(recording state in git...)
fatal: Unable to create '/mnt/backup/dandi/dandisets/000026/.git/index.lock': File exists.

Another git process seems to be running in this repository, e.g.
an editor opened by 'git commit'. Please make sure all processes
are terminated then try again. If it still fails, a git process
may have crashed in this repository earlier:
remove the file manually to continue.
2023-04-06T19:09:30-0400 [ERROR   ] backups2datalad: /mnt/backup/dandi/dandisets/000026: Unable to remove sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-4_flip-3_VFA.json due to lockfile; `fuser -v` output on lockfile (return code 1):

2023-04-06T19:10:59-0400 [ERROR   ] backups2datalad: Job failed on input <Dandiset 000026/draft>:
Traceback (most recent call last):
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/aioutil.py", line 171, in dowork
    outp = await func(inp)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/datasetter.py", line 141, in update_dandiset
    changed = await self.sync_dataset(dandiset, ds, dmanager)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/datasetter.py", line 186, in sync_dataset
    await syncer.sync_assets(error_on_change)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/syncer.py", line 36, in sync_assets
    self.report = await async_assets(
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/asyncer.py", line 471, in async_assets
    async with await open_git_annex(
  File "/home/dandi/miniconda3/envs/dandisets-2/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/asyncer.py", line 263, in process_asset
    await self.ds.remove(asset.path)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/adataset.py", line 240, in remove
    await self.call_git(
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/adataset.py", line 126, in call_git
    await aruncmd(
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/aioutil.py", line 205, in aruncmd
    return await anyio.run_process(argstrs, **kwargs)
  File "/home/dandi/miniconda3/envs/dandisets-2/lib/python3.10/site-packages/anyio/_core/_subprocesses.py", line 90, in run_process
    raise CalledProcessError(cast(int, process.returncode), command, output, errors)
subprocess.CalledProcessError: Command '['git', '-c', 'receive.autogc=0', '-c', 'gc.auto=0', 'rm', '-f', '--ignore-unmatch', '--', 'sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-4_flip-3_VFA.json']' returned non-zero exit status 128.
2023-04-06T19:11:00-0400 [ERROR   ] backups2datalad: An error occurred:
Traceback (most recent call last):
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/__main__.py", line 111, in wrapped
    await f(datasetter, *args, **kwargs)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/__main__.py", line 213, in update_from_backup
    await datasetter.update_from_backup(dandisets, exclude=exclude)
  File "/mnt/backup/dandi/dandisets/tools/backups2datalad/datasetter.py", line 95, in update_from_backup
    raise RuntimeError(
RuntimeError: Backups for 1 Dandiset failed
Logs saved to /mnt/backup/dandi/dandisets/.git/dandi/backups2datalad/2023.04.06.22.53.06Z.log

that log file has at the end

2023-04-06T19:09:30-0400 [DEBUG   ] backups2datalad: Running: git -c receive.autogc=0 -c gc.auto=0 rm -f --ignore-unmatch -- sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-1_flip-2_VFA.json [cwd=/mnt/backup/dandi/dandisets/000026]
2023-04-06T19:09:30-0400 [INFO    ] backups2datalad: Dandiset 000026: sub-I59/ses-MRI/anat/sub-I59_ses-MRI_echo-3_flip-1_VFA.json: Downloading from https://dandiarchive.s3.amazonaws.com/blobs/9e2/a27/9e2a27b8-2a65-472c-a92f-f6ce57c30e1e?versionId=l492J8uRGgywa2EnzZCWV5oe6.yHmx5P
2023-04-06T19:09:30-0400 [INFO    ] backups2datalad: Dandiset 000026: sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-1_flip-2_VFA.json: File is text; sending off for download from https://dandiarchive.s3.amazonaws.com/blobs/4af/5aa/4af5aaf4-ac03-4b29-8dfc-a16ec3004681?versionId=V6O2Z2lvTwOehnx7XZym2raxA1k3LIHh
2023-04-06T19:09:30-0400 [DEBUG   ] backups2datalad: Running: git -c receive.autogc=0 -c gc.auto=0 rm -f --ignore-unmatch -- sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-2_flip-2_VFA.json [cwd=/mnt/backup/dandi/dandisets/000026]
2023-04-06T19:09:30-0400 [INFO    ] backups2datalad: Dandiset 000026: sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-1_flip-2_VFA.json: Downloading from https://dandiarchive.s3.amazonaws.com/blobs/4af/5aa/4af5aaf4-ac03-4b29-8dfc-a16ec3004681?versionId=V6O2Z2lvTwOehnx7XZym2raxA1k3LIHh
2023-04-06T19:09:30-0400 [ERROR   ] backups2datalad: /mnt/backup/dandi/dandisets/000026: Unable to remove sub-I41/ses-MRI/anat/sub-I41_ses-MRI_echo-4_flip-3_VFA.json due to lockfile; `fuser -v` output on lockfile (return code 1):

2023-04-06T19:09:30-0400 [DEBUG   ] backups2datalad: Waiting for `git -c receive.autogc=0 -c gc.auto=0 annex addurl -c annex.alwayscompact=false --batch --with-files --jobs 5 --json --json-error-messages --json-progress --raw` [cwd=/mnt/backup/dandi/dandisets/000026] to terminate
2023-04-06T19:09:30-0400 [DEBUG   ] backups2datalad: Waiting for `git -c receive.autogc=0 -c gc.auto=0 annex addurl -c annex.alwayscompact=false --batch --with-files --jobs 5 --json --json-error-messages --json-progress --raw` [cwd=/mnt/backup/dandi/dandisets/000026] to terminate
2023-04-06T19:09:30-0400 [DEBUG   ] backups2datalad: Command `git -c receive.autogc=0 -c gc.auto=0 annex addurl -c annex.alwayscompact=false --batch --with-files --jobs 5 --json --json-error-messages --json-progress --raw` [cwd=/mnt/backup/dandi/dandisets/000026] exited with return code 0
2023-04-06T19:10:59-0400 [ERROR   ] backups2datalad: Job failed on input <Dandiset 000026/draft>:

suggesting that we

situation seems to be quite reproducible and should be troubleshooted and addressed. meanwhile I will exclude 000026 from the cron job

jwodder commented 1 year ago

@yarikoptic

initiated multiple git rm in parallel? may be we need thread locking there to prevent that?

We already acquire a lock on the AsyncDataset object before calling git rm. If you're referring to the fact that there are multiple lines of the form "Running: git ... rm ...", those are simply the messages logged before executing the commands; no message is logged after the commands return, so you cannot infer from the logs whether their runtimes overlapped.

initiated rm before precommit assured addurl --batch to terminate

ds.repo.precommit() isn't supposed to terminate the git-annex addurl process. addurl was only terminated because the backup process was shutting down after an error occurred.

yarikoptic commented 1 year ago

no message is logged after the commands return, so you cannot infer from the logs whether their runtimes overlapped.

oh, ok. Please add logging on when any particular command finished -- it could help us to identify such overlaps in the future

ds.repo.precommit() isn't supposed to terminate the git-annex addurl process. addurl was only terminated because the backup process was shutting down after an error occurred.

ah! that must be it then

because there is a number of --batch processes which are started directly and not through datalad.support.annexrepo helpers to start them ```shell ❯ git grep -e --batch -B3 tools/backups2datalad/annex.py- self.pfromkey = await open_git_annex( tools/backups2datalad/annex.py- "fromkey", tools/backups2datalad/annex.py- "--force", tools/backups2datalad/annex.py: "--batch", -- tools/backups2datalad/annex.py- if self.pexaminekey is None: tools/backups2datalad/annex.py- self.pexaminekey = await open_git_annex( tools/backups2datalad/annex.py- "examinekey", tools/backups2datalad/annex.py: "--batch", -- tools/backups2datalad/annex.py- if self.pwhereis is None: tools/backups2datalad/annex.py- self.pwhereis = await open_git_annex( tools/backups2datalad/annex.py- "whereis", tools/backups2datalad/annex.py: "--batch-keys", -- tools/backups2datalad/annex.py- "registerurl", tools/backups2datalad/annex.py- "-c", tools/backups2datalad/annex.py- "annex.alwayscompact=false", tools/backups2datalad/annex.py: "--batch", -- tools/backups2datalad/asyncer.py- "addurl", tools/backups2datalad/asyncer.py- "-c", tools/backups2datalad/asyncer.py- "annex.alwayscompact=false", tools/backups2datalad/asyncer.py: "--batch", ```

to be safe, we need to add termination of non-readonly registerurl and addurl to be done there alongside with ds.repo.precommit().

jwodder commented 1 year ago

@yarikoptic Are you sure it's necessary to terminate registerurl and addurl before removing a file? I can only see that being a giant hassle.

yarikoptic commented 1 year ago

@yarikoptic Are you sure it's necessary to terminate registerurl and addurl before removing a file? I can only see that being a giant hassle.

unless you see what else could hold the lock there: it seems to be needed here -- before we identified that running precommit, which stops all batch processes, resolved similar problem. We also do that in DataLad itself specifically for rm: https://github.com/datalad/datalad/blob/HEAD/datalad/support/gitrepo.py#L1338 . So, unless we better identify what is holding the lock and how to mitigate -- I think this is the only way ATM.

jwodder commented 1 year ago

@yarikoptic I've posted a question about this on the git-annex forum: https://git-annex.branchable.com/forum/Batch_process__44___git_rm__44___and_locking/

yarikoptic commented 1 year ago

may be we could retry rm a few times if hitting the lock?

yarikoptic commented 1 year ago

Given the response from Joey the only other possibility (to interrupting addurls) I see is to add internal (thread?) locking to not run rm while addurling a file. (asked there about registerurl just in case -- from description it shouldn't have the same locking need)

jwodder commented 1 year ago

@yarikoptic

add internal (thread?) locking to not run rm while addurling a file.

That's not how the program works. addurl isn't called one invocation per file like rm; it's a long-running batch process that is run while assets are being processed/downloaded. Moreover, the routine for processing an individual asset basically looks like this:

Since we're processing multiple assets at a time, calls to git rm are necessarily happening while addurl is running.

I see two ways to separate the git rm and addurl calls:

yarikoptic commented 1 year ago

@yarikoptic

add internal (thread?) locking to not run rm while addurling a file.

That's not how the program works. addurl isn't called one invocation per file like rm; it's a long-running batch process that is run while assets are being processed/downloaded.

Correct, addurl is batched process and long running. What I meant follows Joey's description:

With --batch, git-annex reads a line, handles it, outputs a response, and waits for the next line. It should not keep any lock held while it's waiting for the line.

so what I was suggesting is to lock internally for the duration of "handles it", which is (I guess) from moment in feed_addurl to the moment in read_addurl , or something like that, or may be making feed_addurl/read_addurl less async and making into a single function to feed(lock)/read progress/finish(unlock). I do not think we want to de-batch addurl since indeed it would considerably slow things down since git annex startup time is notable etc.

Instead of immediately feeding text assets to addurl, store them in a list and run addurl after all other asset processing is complete

  • This might increase the runtime a bit. Other than that, I'm unable to see any complications or drawbacks to this idea, but I wouldn't be surprised if there's something I'm missing.

I think something along these lines would work too! I.e. to "stage" operations: figure out what to be removed or added first, then perform removal and then addition as two separate stages. I think performance hit should be minimal.

jwodder commented 1 year ago

@yarikoptic

so what I was suggesting is to lock internally for the duration of "handles it", which is (I guess) from moment in feed_addurl to the moment in read_addurl , or something like that, or may be making feed_addurl/read_addurl less async and making into a single function to feed(lock)/read progress/finish(unlock). I do not think we want to de-batch addurl since indeed it would considerably slow things down since git annex startup time is notable etc.

I don't see a way to make either of these approaches work without eliminating the use of addurl's parallelism, which would slow things down. Currently, addurl runs a configurable number of download jobs in parallel (backups2datalad.cfg.yaml currently sets this to 5 jobs). Adding locking so that either git rm is running or up to 5 download jobs are running seems to be beyond the capabilities of the synchronization primitives provided by anyio.

yarikoptic commented 1 year ago

Adding locking so that either git rm is running or up to 5 download jobs are running seems to be beyond the capabilities of the synchronization primitives provided by anyio.

I think we might be able to use need_index_change lock which we need to acquire for entire rm call or when manipulating (adding/removing) self.in_progress so that lock is acquired only if in_progress gets first entry, and released when the last entry pop'ed from in_progress. Might need an auxiliary changing_in_progres lock so only one thread does analysis over in_progress ATM on either to acquire or release need_index_change lock). WDYT (or I need to expand more on the idea)?

jwodder commented 1 year ago

@yarikoptic Your "need_index_change lock" link does not mention anything named need_index_change, nor does that term appear anywhere in the anyio documentation. What are you referring to there?

yarikoptic commented 1 year ago

BTW (didn't check the code) but hitting this lock and Joey's response made me think - are we certain to not collide by rm and addurl the same file, i.e. actually needing that clean separation of stages to start with or might end up with race condition over rm and addurl'ing the same path?

yarikoptic commented 1 year ago

need_index_change

I just gave a suggestive name we would use for such a lock to describe its purpose -- to lock for operation requiring git index change (which both rm and addurl do)

jwodder commented 1 year ago

@yarikoptic The code for processing an asset calls git rm and addurl on a given path in two separate steps, and it waits for the git rm to complete before moving on. git rm will never be called on a different asset path than the one being operated on.

jwodder commented 1 year ago

@yarikoptic

I think we might be able to use need_index_change lock which we need to acquire for entire rm call or when manipulating (adding/removing) self.in_progress so that lock is acquired only if in_progress gets first entry, and released when the last entry pop'ed from in_progress.

We would have to use a semaphore instead of a lock, as adding entries to in_progress happens in a different task than removing entries, and, as your link says, "only the task that acquired the lock is allowed to release it."

Might need an auxiliary changing_in_progres lock so only one thread does analysis over in_progress ATM on either to acquire or release need_index_change lock).

I have to point this out to you at some point: asychronous programming in Python does not use threads*; it uses asynchronous tasks, all in a single thread, that are managed by an event loop that switches between them at await points.

Either way, having a single task for managing in_progress and need_index_change would mean the tasks that currently add & remove items to/from in_progress would need to communicate with this new task over a stream (or, if we were using threads, a synchronized queue; not sure where you're expecting a changing_in_progress lock to fit in), and that would end up being more complicated than just using a semaphore.

* Technically, the async system has the ability to wait on blocking synchronous operations as though they were async by running them in a separate thread, and this is used to implement things like async file I/O, but the primary design of async is completely thread-free.

yarikoptic commented 1 year ago

I thought about Semaphore but I don't think we need to count anything...

Might need an auxiliary changing_in_progres lock so only one thread does analysis over in_progress ATM on either to acquire or release need_index_change lock).

I have to point this out to you at some point: asychronous programming in Python does not use threads*; it uses asynchronous tasks, all in a single thread, that are managed by an event loop that switches between them at await points.

thanks (possibly again) for describing how async works -- eventually I will remember it by heart ;) If not threads, then I do not exactly grasp why we need to do some "communication" which you describe. And since we already use anyio.Lock for "rm" invocation, why something like this would not be sufficient?:

diff --git a/tools/backups2datalad/asyncer.py b/tools/backups2datalad/asyncer.py
index b8d6ab3..841fb60 100644
--- a/tools/backups2datalad/asyncer.py
+++ b/tools/backups2datalad/asyncer.py
@@ -125,6 +125,7 @@ class Downloader:
     download_receiver: MemoryObjectReceiveStream[ToDownload] = field(init=False)
     zarrs: dict[str, ZarrLink] = field(init=False, default_factory=dict)
     need_add: list[str] = field(init=False, default_factory=list)
+    in_progress_lock: anyio.Lock = field(init=False, default_factory=anyio.Lock)

     def __post_init__(self) -> None:
         (
@@ -378,11 +379,21 @@ class Downloader:
         async with self.addurl.p.stdin:
             async with self.download_receiver:
                 async for td in self.download_receiver:
-                    self.in_progress[td.path] = td
+                    with self.in_progress_lock:
+                        if not self.in_progress:  # first "active" addurl - need to acquire dataset wide lock
+                            self.ds.lock.acquire()
+                        self.in_progress[td.path] = td
                     self.log.info("%s: Downloading from %s", td.path, td.url)
                     await self.addurl.send(f"{td.url} {td.path}\n")
                 self.log.debug("Done feeding URLs to addurl")

+    def in_progress_pop(self, path):
+        with self.in_progress_lock:
+            val = self.in_progress.pop(path)
+            if self.in_progress:  # if was last active - release acquire dataset wide lock
+                self.ds.lock.release()
+            return val
+
     async def read_addurl(self) -> None:
         async with aclosing(self.addurl) as lineiter:
             async for line in lineiter:
@@ -399,7 +410,7 @@ class Downloader:
                 elif not data["success"]:
                     msg = format_errors(data["error-messages"])
                     self.log.error("%s: download failed:%s", data["file"], msg)
-                    self.in_progress.pop(data["file"])
+                    self.in_progress_pop(data["file"])
                     if "exited 123" in msg:
                         self.log.info(
                             "Will try `git add`ing %s manually later", data["file"]
@@ -412,7 +423,7 @@ class Downloader:
                     key = data.get("key")
                     self.log.info("%s: Finished downloading (key = %s)", path, key)
                     self.report.downloaded += 1
-                    dl = self.in_progress.pop(path)
+                    dl = self.in_progress_pop(path)
                     self.tracker.finish_asset(dl.path)
                     self.nursery.start_soon(
                         self.check_unannexed_hash,
jwodder commented 1 year ago

@yarikoptic

yarikoptic commented 1 year ago

@yarikoptic

  • I think you want that if self.in_progress in in_progress_pop() to be if not self.in_progress, don't you?

correct! ;)

  • feed_addurl() and read_addurl() run in two separate tasks, started here; hence, self.ds.lock cannot be acquired by one of them and released by the other, as only the task that acquired the lock is allowed to release it. The way around this is to use a semaphore with a limit of 1, which is just like a lock but without the "only the task that acquired the lock is allowed to release it" restriction.

ah, cool, so could be done!

We could either change the type of AsyncDataset.lock to a semaphore that is also acquired & released by {feed,read}_addurl() as you describe, or we could add a separate semaphore on the Downloader instance that is acquired & released by {feed,read}_addurl() and also acquired & released by process_asset() when calling self.ds.remove().

I think keeping the lock (changed to semaphore) within AsyncDataset makes most sense since it is not about Downloader per se, but rather locking on the underlying git operations.

  • I believe in_progress_lock is unnecessary in your suggested patch. ...

Thanks for the analysis! Given better understanding (no threads!) indeed should not be needed.

Could you please craft complete PR based on this diff/discussion?