borgbackup / borg

Deduplicating archiver with compression and authenticated encryption.
https://www.borgbackup.org/
Other
11k stars 741 forks source link

locking.py seems multiprocess-safe but not thread-safe #8229

Open justinsteven opened 4 months ago

justinsteven commented 4 months ago

Have you checked borgbackup docs, FAQ, and open GitHub issues?

Mostly

Is this a BUG / ISSUE report or a QUESTION?

Kind of?

System information. For client/server mode post info for both machines.

Your borg version (borg -V).

N/A

Operating system (distribution) and version.

Linux

Hardware / network configuration, and filesystems used.

N/A

How much data is handled by borg?

N/A

Full borg commandline that lead to the problem (leave away excludes and passwords)

N/A

Describe the problem you're observing.

I am making a bad decision that I will own, but I think I've come across something interesting in doing so.

I have adapted and repurposed src/borg/locking.py into my own Python scripts that are responsible for, among other things, synchronising borg repos using rsync to multiple disks at once. The reason for repurposing locking.py is so that I can get non-exclusive locks (borg with-lock seems to give exclusive locks only) so that I can safely have multiple rsync's reading from a given borg repo at once while being sure that borg won't write to the repo. I also prefer being able to use with Lock(): in my code rather than wrapping rsync in borg with-lock.

In repurposing locking.py I made some changes to make it self-contained and to somewhat appease my linter. My borg_lock.py is as follows, but I don't think the changes I made to it are causing the problems I'm having. If they are responsible, I apologise. If the reproducer below reproduces against the real locking.py (I haven't dabbled in borg development so I don't know how to do this myself) then my borg_lock.py should be able to be ruled out as being responsible.

borg_lock.py ```python #!/usr/bin/env python3 import errno import json import os import socket import sys import tempfile import time import uuid # Taken from https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/locking.py ADD, REMOVE, REMOVE2 = "add", "remove", "remove2" SHARED, EXCLUSIVE = "shared", "exclusive" def logger(msg, *args): sys.stderr.write((msg % args) + "\n") class TimeoutTimer: """ A timer for timeout checks (can also deal with "never timeout"). It can also compute and optionally execute a reasonable sleep time (e.g. to avoid polling too often or to support thread/process rescheduling). """ def __init__(self, timeout=None, sleep=None): """ Initialize a timer. :param timeout: time out interval [s] or None (never timeout, wait forever) [default] :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep) or None (autocompute: use 10% of timeout [but not more than 60s], or 1s for "never timeout" mode) """ if timeout is not None and timeout < 0: raise ValueError("timeout must be >= 0") self.timeout_interval = timeout if sleep is None: if timeout is None: sleep = 1.0 else: sleep = min(60.0, timeout / 10.0) self.sleep_interval = sleep self.start_time = None self.end_time = None def __repr__(self): return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format( self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval ) def start(self): self.start_time = time.time() if self.timeout_interval is not None: self.end_time = self.start_time + self.timeout_interval return self def sleep(self): if self.sleep_interval >= 0: time.sleep(self.sleep_interval) def timed_out(self): return self.end_time is not None and time.time() >= self.end_time def timed_out_or_sleep(self): if self.timed_out(): return True else: self.sleep() return False class LockError(Exception): pass class LockFailed(LockError): pass class LockTimeout(LockError): pass class NotLocked(LockError): pass class NotMyLock(LockError): pass # From https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/platform/base.py#L302 # patched socket.getfqdn() - see https://bugs.python.org/issue5004 def getfqdn(name=""): """Get fully qualified domain name from name. An empty argument is interpreted as meaning the local host. """ name = name.strip() if not name or name == "0.0.0.0": name = socket.gethostname() try: addrs = socket.getaddrinfo(name, None, 0, socket.SOCK_DGRAM, 0, socket.AI_CANONNAME) except OSError: pass else: for addr in addrs: if addr[3]: name = addr[3] break return name # for performance reasons, only determine hostname / fqdn / hostid once. # XXX this sometimes requires live internet access for issuing a DNS query in the background. hostname = socket.gethostname() fqdn = getfqdn(hostname) # some people put the fqdn into /etc/hostname (which is wrong, should be the short hostname) # fix this (do the same as "hostname --short" cli command does internally): hostname = hostname.split(".")[0] # uuid.getnode() is problematic in some environments (e.g. OpenVZ, see #3968) where the virtual MAC address # is all-zero. uuid.getnode falls back to returning a random value in that case, which is not what we want. # thus, we offer BORG_HOST_ID where a user can set an own, unique id for each of his hosts. hostid = os.environ.get("BORG_HOST_ID") if not hostid: hostid = f"{fqdn}@{uuid.getnode()}" def get_process_id() -> tuple[str, int, int]: """ Return identification tuple (hostname, pid, thread_id) for 'us'. This always returns the current pid, which might be different from before, e.g. if daemonize() was used. Note: Currently thread_id is *always* zero. """ thread_id = 0 pid = os.getpid() return hostid, pid, thread_id # END taken from platform/base.py # Taken from https://github.com/borgbackup/borg/blob/1525c72549a8e0de3e95fb991f56da54de54a1d6/src/borg/platform/posix.pyx#L37 def local_pid_alive(pid): """Return whether *pid* is alive.""" try: # This doesn't work on Windows. # This does not kill anything, 0 means "see if we can send a signal to this process or not". # Possible errors: No such process (== stale lock) or permission denied (not a stale lock). # If the exception is not raised that means such a pid is valid and we can send a signal to it. os.kill(pid, 0) return True except OSError as err: if err.errno == errno.ESRCH: # ESRCH = no such process return False # Any other error (eg. permissions) means that the process ID refers to a live process. return True def process_alive(host, pid, thread): """ Check whether the (host, pid, thread_id) combination corresponds to a process potentially alive. If the process is local, then this will be accurate. If the process is not local, then this returns always True, since there is no real way to check. """ assert isinstance(host, str) assert isinstance(hostid, str) assert isinstance(pid, int) assert isinstance(thread, int) if host != hostid: return True if thread != 0: # Currently thread is always 0, if we ever decide to set this to a non-zero value, # this code needs to be revisited, too, to do a sensible thing return True return local_pid_alive(pid) # END taken from platform/posix.py class ExclusiveLock: """An exclusive Lock based on mkdir fs operation being atomic. If possible, try to use the contextmanager here like:: with ExclusiveLock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ id_: tuple[str, int, int] def __init__(self, path, timeout=None, sleep=None, id_=None): self.timeout = timeout self.sleep = sleep self.path = os.path.abspath(path) self.id_ = id_ or get_process_id() self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id_) self.kill_stale_locks = True self.stale_warning_printed = False def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return f"<{self.__class__.__name__}: {self.unique_name!r}>" def acquire(self, timeout=None, sleep=None): if timeout is None: timeout = self.timeout if sleep is None: sleep = self.sleep parent_path, base_name = os.path.split(self.path) unique_base_name = os.path.basename(self.unique_name) temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path) temp_unique_name = os.path.join(temp_path, unique_base_name) try: with open(temp_unique_name, "wb"): pass except OSError as err: raise LockFailed(f"Failed to lock {self.path}: {err}") from None else: timer = TimeoutTimer(timeout, sleep).start() while True: try: os.replace(temp_path, self.path) except OSError: # already locked if self.by_me(): return self self.kill_stale_lock() if timer.timed_out_or_sleep(): raise LockTimeout(self.path) from None else: temp_path = None # see finally:-block below return self finally: # Renaming failed for some reason, so temp_dir still exists and # should be cleaned up anyway. Try to clean up, but don't crash. try: os.unlink(temp_unique_name) except: # noqa pass try: os.rmdir(temp_path) except: # noqa pass def release(self): if not self.is_locked(): raise NotLocked(self.path) if not self.by_me(): raise NotMyLock(self.path) os.unlink(self.unique_name) for retry in range(42): try: os.rmdir(self.path) except OSError as err: if err.errno in (errno.EACCES,): # windows behaving strangely? -> just try again. continue if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT): # EACCES or EIO or ... = we cannot operate anyway, so re-throw raise err # else: # Directory is not empty or doesn't exist any more. # this means we lost the race to somebody else -- which is ok. return def is_locked(self): return os.path.exists(self.path) def by_me(self): return os.path.exists(self.unique_name) def kill_stale_lock(self): try: names = os.listdir(self.path) except FileNotFoundError: # another process did our job in the meantime. return False except PermissionError: # win32 might throw this. return False else: for name in names: try: host_pid, thread_str = name.rsplit("-", 1) host, pid_str = host_pid.rsplit(".", 1) pid = int(pid_str) thread = int(thread_str, 16) except ValueError: # Malformed lock name? Or just some new format we don't understand? logger(f"Found malformed lock {name} in {self.path}. Please check/fix manually.") return False if process_alive(host, pid, thread): return False if not self.kill_stale_locks: if not self.stale_warning_printed: # Log this at warning level to hint the user at the ability logger( f"Found stale lock {name}, but not deleting because self.kill_stale_locks = False." ) self.stale_warning_printed = True return False try: os.unlink(os.path.join(self.path, name)) logger(f"Killed stale lock {name}.") except OSError as err: if not self.stale_warning_printed: # This error will bubble up and likely result in locking failure logger(f"Found stale lock {name}, but cannot delete due to {err}") self.stale_warning_printed = True return False try: os.rmdir(self.path) except OSError as err: if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT): # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok. return False # EACCES or EIO or ... = we cannot operate anyway logger("Failed to remove lock dir: %s", str(err)) return False return True def break_lock(self): if self.is_locked(): for name in os.listdir(self.path): os.unlink(os.path.join(self.path, name)) os.rmdir(self.path) def migrate_lock(self, old_id, new_id): """migrate the lock ownership from old_id to new_id""" assert self.id_ == old_id new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id) if self.is_locked() and self.by_me(): with open(new_unique_name, "wb"): pass os.unlink(self.unique_name) self.id_, self.unique_name = new_id, new_unique_name class LockRoster: """ A Lock Roster to track shared/exclusive lockers. Note: you usually should call the methods with an exclusive lock held, to avoid conflicting access by multiple threads/processes/machines. """ def __init__(self, path, id_=None): self.path = path self.id_ = id_ or get_process_id() self.kill_stale_locks = True def load(self): try: with open(self.path) as f: data = json.load(f) # Just nuke the stale locks early on load if self.kill_stale_locks: for key in (SHARED, EXCLUSIVE): try: entries = data[key] except KeyError: continue elements = set() for host, pid, thread in entries: if process_alive(host, pid, thread): elements.add((host, pid, thread)) else: logger( f"Removed stale {key} roster lock for host {host} pid {pid} thread {thread}." ) data[key] = list(elements) except (FileNotFoundError, ValueError): # no or corrupt/empty roster file? data = {} return data def save(self, data): with open(self.path, "w") as f: json.dump(data, f) def remove(self): try: os.unlink(self.path) except FileNotFoundError: pass def get(self, key): roster = self.load() return {tuple(e) for e in roster.get(key, [])} def empty(self, *keys): return all(not self.get(key) for key in keys) def modify(self, key, op): roster = self.load() try: elements = {tuple(e) for e in roster[key]} except KeyError: elements = set() if op == ADD: elements.add(self.id_) elif op == REMOVE: # note: we ignore it if the element is already not present anymore. # this has been frequently seen in teardowns involving Repository.__del__ and Repository.__exit__. elements.discard(self.id_) elif op == REMOVE2: # needed for callers that do not want to ignore. elements.remove(self.id_) else: raise ValueError("Unknown LockRoster op %r" % op) roster[key] = list(list(e) for e in elements) self.save(roster) def migrate_lock(self, key, old_id, new_id): """migrate the lock ownership from old_id to new_id""" assert self.id_ == old_id # need to switch off stale lock killing temporarily as we want to # migrate rather than kill them (at least the one made by old_id). killing, self.kill_stale_locks = self.kill_stale_locks, False try: try: self.modify(key, REMOVE2) except KeyError: # entry was not there, so no need to add a new one, but still update our id self.id_ = new_id else: # old entry removed, update our id and add a updated entry self.id_ = new_id self.modify(key, ADD) finally: self.kill_stale_locks = killing class Lock: """ A Lock for a resource that can be accessed in a shared or exclusive way. Typically, write access to a resource needs an exclusive lock (1 writer, no one is allowed reading) and read access to a resource needs a shared lock (multiple readers are allowed). If possible, try to use the contextmanager here like:: with Lock(...) as lock: ... This makes sure the lock is released again if the block is left, no matter how (e.g. if an exception occurred). """ def __init__(self, path, exclusive=False, sleep=None, timeout=None, id_=None): self.path = path self.is_exclusive = exclusive self.sleep = sleep self.timeout = timeout self.id_ = id_ or get_process_id() # globally keeping track of shared and exclusive lockers: self._roster = LockRoster(path + ".roster", id_=id_) # an exclusive lock, used for: # - holding while doing roster queries / updates # - holding while the Lock itself is exclusive self._lock = ExclusiveLock(path + ".exclusive", id_=id_, timeout=timeout) def __enter__(self): return self.acquire() def __exit__(self, *exc): self.release() def __repr__(self): return f"<{self.__class__.__name__}: {self.id_!r}>" def acquire(self, exclusive=None, remove=None, sleep=None): if exclusive is None: exclusive = self.is_exclusive sleep = sleep or self.sleep or 0.2 if exclusive: self._wait_for_readers_finishing(remove, sleep) self._roster.modify(EXCLUSIVE, ADD) else: with self._lock: if remove is not None: self._roster.modify(remove, REMOVE) self._roster.modify(SHARED, ADD) self.is_exclusive = exclusive return self def _wait_for_readers_finishing(self, remove, sleep): timer = TimeoutTimer(self.timeout, sleep).start() while True: self._lock.acquire() try: if remove is not None: self._roster.modify(remove, REMOVE) if len(self._roster.get(SHARED)) == 0: return # we are the only one and we keep the lock! # restore the roster state as before (undo the roster change): if remove is not None: self._roster.modify(remove, ADD) except: # noqa # avoid orphan lock when an exception happens here, e.g. Ctrl-C! self._lock.release() raise else: self._lock.release() if timer.timed_out_or_sleep(): raise LockTimeout(self.path) def release(self): if self.is_exclusive: self._roster.modify(EXCLUSIVE, REMOVE) if self._roster.empty(EXCLUSIVE, SHARED): self._roster.remove() self._lock.release() else: with self._lock: self._roster.modify(SHARED, REMOVE) if self._roster.empty(EXCLUSIVE, SHARED): self._roster.remove() def upgrade(self): # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they # all will wait until the other read locks go away - and that won't happen. if not self.is_exclusive: self.acquire(exclusive=True, remove=SHARED) def downgrade(self): if self.is_exclusive: self.acquire(exclusive=False, remove=EXCLUSIVE) def got_exclusive_lock(self): return self.is_exclusive and self._lock.is_locked() and self._lock.by_me() def break_lock(self): self._roster.remove() self._lock.break_lock() def migrate_lock(self, old_id, new_id): assert self.id_ == old_id self.id_ = new_id if self.is_exclusive: self._lock.migrate_lock(old_id, new_id) self._roster.migrate_lock(EXCLUSIVE, old_id, new_id) else: with self._lock: self._lock.migrate_lock(old_id, new_id) self._roster.migrate_lock(SHARED, old_id, new_id) ```

The problem I'm having is that when run in a threaded context, exclusive and non-exclusive locks raise within the locking code itself.

Create a repo:

% borg init -e none myrepo

Regularly fail to get exclusive and non-exclusive locks in threads:

#!/usr/bin/env python3
import concurrent.futures
from borg_lock import Lock
import os

def worker_exclusive(*args, **kwargs):
    print("Getting exclusive lock")
    with Lock("myrepo/", exclusive=True):
        print(os.getpid())

def worker_nonexclusive(*args, **kwargs):
    print("Getting non-exclusive lock")
    with Lock("myrepo/", exclusive=False):
        print(os.getpid())

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(worker_exclusive) for _ in range(8)]
    for future in concurrent.futures.as_completed(futures):
        print(future)

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(worker_nonexclusive) for _ in range(8)]
    for future in concurrent.futures.as_completed(futures):
        print(future)
% python3 thread.py
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3048259
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3048259
3048259
3048259
3048259
<Future at 0x7fe92dbb23a0 state=finished returned NoneType>
3048259
3048259
<Future at 0x7fe92dbcbbb0 state=finished raised NotLocked>
<Future at 0x7fe92dbd7190 state=finished raised NotLocked>
<Future at 0x7fe92dbd7af0 state=finished raised NotLocked>
<Future at 0x7fe92dbd7610 state=finished raised NotLocked>
<Future at 0x7fe92dbd7df0 state=finished raised NotLocked>
<Future at 0x7fe92dbcbd30 state=finished raised NotLocked>
3048259
<Future at 0x7fe92dbded60 state=finished returned NoneType>
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
3048259
3048259
3048259
3048259
<Future at 0x7fe92c35dbb0 state=finished raised FileNotFoundError>
<Future at 0x7fe92c3595b0 state=finished raised FileNotFoundError>
<Future at 0x7fe92dbd72b0 state=finished returned NoneType>
<Future at 0x7fe92c35d1c0 state=finished raised NotLocked>
<Future at 0x7fe92dbe45e0 state=finished returned NoneType>
<Future at 0x7fe92c35dbe0 state=finished raised NotLocked>
3048259
<Future at 0x7fe92c3590d0 state=finished raised NotLocked>
<Future at 0x7fe92c35d340 state=finished returned NoneType>

Reliably succeed in getting exclusive and non-exclusive locks across multiple processes:

#!/usr/bin/env python3
from borg_lock import Lock
from multiprocessing import Pool
import os

def worker_exclusive(*args, **kwargs):
    print("Getting exclusive lock")
    with Lock("myrepo/", exclusive=True):
        print(os.getpid())

def worker_nonexclusive(*args, **kwargs):
    print("Getting non-exclusive lock")
    with Lock("myrepo/", exclusive=False):
        print(os.getpid())

with Pool(8) as p:
    print(p.map(worker_exclusive, range(8)))

with Pool(8) as p:
    print(p.map(worker_nonexclusive, range(8)))
% ./multiprocess.py
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
Getting exclusive lock
3049067
3049066
3049073
3049069
3049070
3049071
3049068
3049072
[None, None, None, None, None, None, None, None]
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
Getting non-exclusive lock
3049102
3049106
3049104
3049103
3049107
3049105
3049108
3049109
[None, None, None, None, None, None, None, None]

Suspicions and thoughts

I'm guessing that perhaps https://github.com/borgbackup/borg/blob/e3f565623dd482ea4a8211644e234a15724d5f3f/src/borg/platform/base.py#L309 could be related. I'm guessing this causes the locking to key off of the hostid and PID only. Without the thread ID being part of the keying, multiple parallel lock acquisitions and/or releases within the same process could interfere with each other.

I'm also guessing that borg doesn't use threading. And so this might not be a problem for the project. It might only be a problem in the case that borg adopts threading somewhere, or for folks like me who are borrowing parts of borg's internals for their own bad decisions.

And so perhaps the locking is working as intended. It just worries me to see locking code that seems to fail under multi-threading.

ThomasWaldmann commented 3 months ago

@justinsteven could you do a PR with a test for this (which fails with current code) and a fix for the locking code that makes that test succeed?

justinsteven commented 3 months ago

@ThomasWaldmann sorry for the slow response. I can try, but I'm not sure when I'll be able to get to it. Also, just thinking out loud, a test case would likely fail non-deterministically (though the POC shown above demonstrates failure quite reliably on my machine)

ThomasWaldmann commented 3 months ago

If it randomly fails, we could skip it by default. It would be useful nevertheless, just to check it works / to find if it doesn't.