Scille / parsec-cloud

Open source Dropbox-like file sharing with full client encryption !
https://parsec.cloud
Other
268 stars 40 forks source link

A better pattern for handling requests to the local database #1715

Open vxgmichel opened 3 years ago

vxgmichel commented 3 years ago

PR #1713 fixes specific issues (#1698, #1699, #1700 and partially #1701) but a general solution would be much better.

A possible pattern for all methods that uses the local database (i.e methods in ManifestStorage, BlockStorage and ChunkStorage) would be:

async def read_and_slash_or_write_method_without_cache(self, key, value):
    localdb = self.localdb

    # Declare serde and database logic in a sync function
    def _thread_target():
        serialized = serialize(value)
        with localdb.threading_lock_and_get_cursor() as cursor:
            cursor.execute(some_statement, key, serialized)
            rows = cursor.executemany(some_otherstatement, key)
        return deserialize(rows)

    # Should not fetch or change any attribute of `self`
    assert 'self' not in _thread_target.co_names

    # Run the sync function in a dedicated thread (no capacity limit)
    return await self.localdb.run_in_thread(_thread_target)

Note:

Result:

Pros:

Cons:

vxgmichel commented 3 years ago

Extra note:

It is not trivial to manage those accesses to the local database while maintaining a cache in a consistent way, like we do for the manifests. Luckily, the tricky cache handling logic and this proposal can be handled separately. Here's a possible solution.

Let's use both existing _cache and _cache_ahead_of_localdb data structures, along with a new dictionary of locks:

def __init__(self):
    self._cache = {}
    self._cache_ahead_of_localdb = {}
    self._cache_locks = defaultdict(trio.Lock)

Here's the implementation for the synchronous cache methods

def _read_from_cache(self, key):
    return self._cache.get(key, None)

def _write_to_cache(self, key, value, extras=()):
    self._cache[key] = value
    self._cache_ahead_of_localdb.setdefault(key, []).extend(extras)

def _is_cache_ahead_of_localdb(self, key):
    return key in self._cache_ahead_of_localdb

def _tag_as_up_to_date_with_localdb(self):
    return self._cache_ahead_of_localdb.pop(key, None)

def _get_extras(self, key):
    return self._cache_ahead_of_localdb.get(key, None)

The two read/write methods that access the local database implement the proposed pattern explained earlier:

async def _read_from_localdb(self, key):
    # See the #1715 proposal
    ...

async def _write_to_localdb(self, key, value, extras):
    # See the #1715 proposal
    ...

The tricky cache+localdb logic is locked withing two async methods, load_to_cache and flush_to_localdb:

async def load_to_cache(key):
    # Make sure the cache hasn't been populated already
    cache_result = self._read_from_cache(key)
    if cache_result is not None:
        return
    # Locks against simultaneous cache and localdb access
    async with self._cache_locks[key]:
        # The cache might have been written in the meantime
        cache_result = self._read_from_cache(key)
        if cache_result is not None:
            return
        # Access the local database
        localdb_result = await self._read_from_localdb(key)
        # The cache might have been written in the meantime
        cache_result = self._read_from_cache(key)
        if cache_result is not None:
            return
        # Otherwise, safely write the cache
        self._write_to_cache(key, localdb_result)

async def flush_to_localdb(key):
    # Make sure there is something to flush
    if self._is_cache_ahead_of_localdb(key):
        return
    # Locks against simultaneous cache and localdb access
    async with self._cache_locks[key]:
        # Make sure there is something to flush, as it might have changed
        if self._is_cache_ahead_of_localdb(key):
            return
        # Get value and extras
        value = self._read_cache(key)
        extras = self._get_extras(key)
        # Write the value to the local database
        await self._write_to_localdb(key, value, extras)
        # Make the cache hasn't changed in the meantime
        if value != self._read_cache(key):
            return
        # Otherwise safely acknowledge the successful write
        self._tag_as_up_to_date_with_localdb(key)

Finally, the read and write async methods can be implemented as such:

async def read_with_cache(self, key):
    # Load cache entry if not available
    await self.load_cache_entry(key)
    # Then read from cache
    return self._read_from_cache(key)

async def write_with_cache(key, value, extras, flush=False):
    # Write to cache
    self._write_to_cache(key, value, extras)
    # Flush if necessary
    if flush:
        await self.flush_cache_entry(key)