Amulet-Team / Amulet-Core

A Python library for reading and writing the Minecraft save formats. See Amulet for the actual editor.
https://www.amuletmc.com/
220 stars 33 forks source link

[Feature Request] Level locking #262

Open gentlegiantJGC opened 1 year ago

gentlegiantJGC commented 1 year ago

Feature Request

The Problem

Some thought needs to be put into the history system and how threading will effect it. The history system must work by starting a transaction, making changes and then ending the transaction. All changes made as part of that transaction are part of that undo operation. This is very simple if only one operation is running at once because all changes are from that one operation even if it supports threads.

The complexity is introduced if two operations are running in parallel. The solutions I see are 1) Add a lock so only one operation can edit the level at once 2) Group the operations into one transaction 3) Support both with a switch

We also need to support having no history system in which case the operations can happily run in parallel.

Feature Description

I suggest a global switch that allows switching between serial and parallel operations (if the operations support it) and another to enable or disable the history system.

We also need a lock to be able to stop operations if running in serial mode.

This will also need to allow multiple threads from a single operation to execute.

gentlegiantJGC commented 1 year ago

I still need to work out how this will tie into the history system but this is the implementation so far. It adds a lock that can be acquired by multiple threads that all use the same token.

from __future__ import annotations

from functools import cached_property
from weakref import proxy, WeakValueDictionary
from threading import Condition, RLock, Lock
from contextlib import contextmanager
from collections import deque
from copy import deepcopy
from contextvars import ContextVar
from typing import Optional
from uuid import uuid4

# Should level objects be editable by default.
DefaultEditMode = True
# Can edit operations run in parallel.
ParallelEditing = False
# Is the history system enabled.
HistoryEnabled = True

class Chunk:
    pass

class LockNotAcquired(RuntimeError):
    pass

class TokenLock:
    """
    A custom lock that allows all threads with the same token to run in parallel.
    This is useful to support serial operations that can have parallel threads.
    """

    def __init__(self):
        self._lock = Lock()
        self._condition = Condition(self._lock)
        self._token: Optional[str] = None
        self._count: int = 0

    def _acquire_shared(self, token: str):
        with self._lock:
            while not (self._token is None or self._token == token):
                # Wait until the lock is not locked or is locked with this token.
                self._condition.wait()

            self._token = token
            self._count += 1

    def _release_shared(self, token: str):
        with self._lock:
            if self._token == token:
                self._count -= 1
                if self._count <= 0:
                    self._token = None
                    self._count = 0
            if not self._token:
                # Wake up any threads waiting to acquire the lock
                self._condition.notify_all()

    @contextmanager
    def lock(self, token: str = None):
        """
        Acquire the lock with a context manager.
        If the lock is not locked it will be locked with the given token.
        If the lock is locked with the same token the thread will continue.
        If the lock is locked with a different token this will block until the lock can be acquired with the given token.

        :param token: The token to use. Defaults to a random UUID.
        """
        if token is None:
            token = str(uuid4())
        self._acquire_shared(token)
        try:
            yield
        finally:
            self._release_shared(token)

class PrivateLevel:
    """Storage of private level data"""
    def __init__(self):
        self.editable_var = ContextVar("edit_mode", default=DefaultEditMode)
        self.edit_lock = TokenLock()

    @property
    def editable(self) -> bool:
        return self.editable_var.get()

class ChunkStorage:
    def __init__(self, level: PrivateLevel):
        # Weak pointer to the level to get raw and shared data
        self._level: PrivateLevel = proxy(level)
        # Mapping from chunk location to chunk object. Weakly stored so that we don't need to manually unload.
        self._chunks = WeakValueDictionary[tuple[str, int, int], Chunk]()
        # A deque to keep recently/frequently used chunks loaded
        self._chunk_cache = deque[Chunk](maxlen=100)
        # A lock per chunk
        self._locks = WeakValueDictionary[tuple[str, int, int], RLock]()
        # A lock that must be acquired before touching _locks
        self._locks_lock = Lock()

    def __get_lock(self, key: tuple[str, int, int]) -> RLock:
        with self._locks_lock:
            lock = self._locks.get(key)
            if lock is None:
                lock = self._locks[key] = RLock()
        return lock

    @contextmanager
    def lock(self, dimension: str, cx: int, cz: int, *, blocking: bool = True, timeout: float = -1):
        """
        Lock access to the chunk.

        >>> with level.chunk.lock(dimension, cx, cz):
        >>>     # Do what you need to with the chunk
        >>>     # No other threads are able to edit or set the chunk while in this with block.

        If you want to lock, get and set the chunk data :meth:`edit` is probably a better fit.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :param blocking: Should this block until the lock is acquired.
        :param timeout: The amount of time to wait for the lock.
        :raises:
            LockNotAcquired: If the lock was not acquired.
        """
        key = (dimension, cx, cz)
        lock = self.__get_lock(key)
        if not lock.acquire(blocking, timeout):
            # Thread was not acquired
            raise LockNotAcquired("Lock was not acquired.")
        try:
            yield
        finally:
            lock.release()

    @contextmanager
    def edit(self, dimension: str, cx: int, cz: int, blocking: bool = True, timeout: float = -1):
        """
        Lock and edit a chunk.

        >>> with level.chunk.edit(dimension, cx, cz) as chunk:
        >>>     # Edit the chunk data
        >>>     # No other threads are able to edit the chunk while in this with block.
        >>>     # When the with block exits the edited chunk will be automatically set if no exception occurred.
        """
        if not self._level.editable:
            raise RuntimeError("The level is not editable in this context.")
        with self.lock(dimension, cx, cz, blocking=blocking, timeout=timeout):
            chunk = self.get(dimension, cx, cz)
            yield chunk
            # If an exception occurs in user code, this line won't be run.
            self.set(dimension, cx, cz, chunk)

    def get(self, dimension: str, cx: int, cz: int) -> Chunk:
        """
        Get a deep copy of the chunk data.
        If you want to edit the chunk, use :meth:`edit` instead.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :return: A unique copy of the chunk data.
        """
        return Chunk()

    def set(self, dimension: str, cx: int, cz: int, chunk: Chunk):
        """
        Overwrite the chunk data.
        You must lock access to the chunk before setting it otherwise an exception may be raised.
        If you want to edit the chunk, use :meth:`edit` instead.

        :param dimension: The dimension the chunk is stored in.
        :param cx: The chunk x coordinate.
        :param cz: The chunk z coordinate.
        :param chunk: The chunk data to set.
        :raises:
            LockNotAcquired: If the chunk is already locked by another thread.
        """
        if not self._level.editable:
            raise RuntimeError("The level is not editable in this context.")
        key = (dimension, cx, cz)
        lock = self.__get_lock(key)
        if lock.acquire(False):
            try:
                chunk = deepcopy(chunk)
                # TODO set the chunk and notify listeners
            finally:
                lock.release()
        else:
            raise LockNotAcquired("Cannot set a chunk if it is locked by another thread.")

    def on_change(self, callback):
        """A notification system for chunk changes."""
        raise NotImplementedError

class Level:
    def __init__(self):
        self._data = PrivateLevel()

    @property
    def editable(self) -> bool:
        """Is the level editable in this context."""
        return self._data.editable

    @contextmanager
    def edit(self, transaction_token: str = None):
        """
        Make the level editable in this context.

        Depending on the configuration this may block until other operations have completed.

        >>> with level.edit():
        >>>     # edit the level
        >>> # Level is no longer editable

        :param transaction_token: Optional UUID string. If an operation uses threads, using the same transaction_token will allow them to run in parallel.
        """
        if ParallelEditing:
            # Operations are configured to allow running in parallel.
            # The token is not used in this context.
            transaction_token = ""
        with self._data.edit_lock.lock(transaction_token):
            token = self._data.editable_var.set(True)
            try:
                yield
            finally:
                self._data.editable_var.reset(token)

    @cached_property
    def chunk(self) -> ChunkStorage:
        return ChunkStorage(self._data)
gentlegiantJGC commented 1 year ago

@Podshot raised the issue of deadlocking if two or more threads try to acquire locks that the other thread has already acquired.

The only solution I can see is to implement a custom lock that checks for the deadlock condition and raises an exception. This isn't exactly pretty but it is the best solution I can see. We should also implement an edit_exclusive context manager in the level so that if an operation needs to do more complex locking it can ensure it is the only operation running.

Here is my current implementation of a deadlock blocking RLock.

from __future__ import annotations
from threading import Thread, RLock, Lock, Condition, get_ident
import time
from typing import Optional
from contextlib import contextmanager

_lock = Lock()
# Map from thread to the locks it has acquired and is waiting on.
_threads: dict[int, ThreadState] = {}

class DeadlockError(Exception):
    pass

class ThreadState:
    locked: set[SafeRLock]  # The locks this thread has acquired
    waiting: Optional[SafeRLock]  # The lock this thread is waiting for. Might be None.

    def __init__(self):
        self.locked = set()
        self.waiting = None

class SafeRLock:
    """RLock that raises a DeadlockError when acquiring if it would deadlock."""
    _lock: RLock
    _condition: Condition
    _pending: set[int]  # The threads waiting for this lock
    _owner: Optional[int]  # The thread that owns this lock
    _lock_count: int  # How many times the lock has been acquired by its owner

    def __init__(self):
        self._lock = RLock()
        self._condition = Condition(_lock)
        self._owner = None
        self._pending = set()
        self._lock_count = 0

    def _would_deadlock(self) -> bool:
        """If this lock is acquired would it be a deadlock state"""
        this_thread_id = get_ident()

        owner_thread_id = self._owner
        if owner_thread_id == this_thread_id:
            # This lock is locked by this thread. Not a deadlock.
            return False

        lock = self
        while True:
            # Find the thread that owns the lock
            owner_thread_id = lock._owner
            if owner_thread_id is None:
                # This lock is not locked. Not a deadlock.
                return False
            elif owner_thread_id == this_thread_id:
                return True

            # See which lock it is waiting for
            lock = _threads[owner_thread_id].waiting
            if lock is None:
                # The thread is not waiting for a lock. Not a deadlock
                return False

    def acquire(self, blocking=True, timeout=-1):
        with _lock:

            # Try and acquire the lock without blocking
            if self._lock.acquire(blocking=False):
                # If this succeeds then the lock is not in use.
                ident = get_ident()
                self._owner = ident
                self._lock_count += 1
                _threads.setdefault(ident, ThreadState()).locked.add(self)
                return True

            if not blocking:
                # We already tried to acquire without blocking and failed
                return False

            if self._would_deadlock():
                raise DeadlockError("Acquiring this lock would lead to a deadlock.")

            # Wait for the lock
            ident = get_ident()
            self._pending.add(ident)
            thread_state = _threads.setdefault(ident, ThreadState())
            thread_state.waiting = self

            if self._condition.wait(timeout):
                if not self._lock.acquire(blocking=False):
                    # Only one thread should be resumed so this shouldn't happen
                    raise RuntimeError
                # Remove pending state
                self._pending.remove(ident)
                thread_state.waiting = None
                # Add locked state
                self._owner = ident
                self._lock_count += 1
                thread_state.locked.add(self)
                return True
            else:
                # Timed out
                return False

    def release(self):
        with _lock:
            if self._owner != get_ident():
                raise RuntimeError("Lock not owned by thread that tried releasing it.")

            ident = get_ident()
            lock_state = _threads[ident]
            # Remove the locked state
            lock_state.locked.remove(self)
            if not lock_state.locked and lock_state.waiting is None:
                del _threads[ident]

            self._lock_count -= 1
            if self._lock_count < 0:
                raise RuntimeError
            if self._lock_count == 0:
                self._owner = None
            self._lock.release()

            self._condition.notify()

    @contextmanager
    def lock(self, blocking=True, timeout=-1):
        locked = self.acquire(blocking, timeout)
        try:
            yield locked
        finally:
            if locked:
                self.release()

import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(threadName)s - %(message)s")

l1 = SafeRLock()
l2 = SafeRLock()
l3 = SafeRLock()

def op1():
    logging.info("op1 getting l1")
    with l1.lock():
        logging.info("op1 got l1")
        time.sleep(1)
        logging.info("op1 getting l2")
        with l2.lock():
            logging.info("op1 getting l2")
            time.sleep(1)

def op2():
    logging.info("op2 getting l2")
    with l2.lock():
        logging.info("op2 got l2")
        time.sleep(1)
        logging.info("op2 getting l3")
        with l3.lock():
            logging.info("op2 got l3")
            time.sleep(1)

def op3():
    logging.info("op3 getting l3")
    with l3.lock():
        logging.info("op3 got l3")
        time.sleep(1)
        logging.info("op3 getting l1")
        with l1.lock():
            logging.info("op3 got l1")
            time.sleep(1)

def main():
    t1 = Thread(target=op1)
    t2 = Thread(target=op2)
    t3 = Thread(target=op3)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    logging.info("finished")

if __name__ == '__main__':
    main()