peter-wangxu / persist-queue

A thread-safe disk based persistent queue in Python
BSD 3-Clause "New" or "Revised" License
335 stars 50 forks source link

Queue gets corrupted after power cut #180

Closed jensanjo closed 9 months ago

jensanjo commented 2 years ago

I am using persistqueue.Queue to keep track of a list of pending ftp transfers. After an sudden power down of the system the queue can not be loaded.

from persistqueue import Queue
q = Queue('queue')
$ python qtest.py 
Default tempdir '/tmp/tmpcuapvutq' is not on the same filesystem with queue path 'queue',defaulting to 'queue'.
Traceback (most recent call last):
  File "qtest.py", line 3, in <module>
    q = Queue('queue')
  File "/home/joep/src/queuetest/venv/lib/python3.8/site-packages/persistqueue/queue.py", line 108, in __init__
    self.info = self._loadinfo()
  File "/home/joep/src/queuetest/venv/lib/python3.8/site-packages/persistqueue/queue.py", line 271, in _loadinfo
    info = self.serializer.load(f)
  File "/home/joep/src/queuetest/venv/lib/python3.8/site-packages/persistqueue/serializers/pickle.py", line 28, in load
    return pickle.load(fp)
_pickle.UnpicklingError: invalid load key, '\x00'.
Exception ignored in: <function Queue.__del__ at 0x7fc71eb5e4c0>
Traceback (most recent call last):
  File "/home/joep/src/queuetest/venv/lib/python3.8/site-packages/persistqueue/queue.py", line 313, in __del__
    for to_close in [self.headf, self.tailf]:
AttributeError: 'Queue' object has no attribute 'headf'
jensanjo commented 2 years ago

Here is the test program and queue files to reproduce the problem. qtest.tar.gz

peter-wangxu commented 2 years ago

which version are you using?

jensanjo commented 2 years ago

I am using 0.7.0 from Pypi

Op do 24 feb. 2022 11:48 schreef Peter Wang @.***>:

which version are you using?

— Reply to this email directly, view it on GitHub https://github.com/peter-wangxu/persist-queue/issues/180#issuecomment-1049728334, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAO4VYWOFSAPCTPN3POGNULU4YEHTANCNFSM5PGTBFBQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you authored the thread.Message ID: @.***>

peter-wangxu commented 2 years ago

thanks, more questions: 1: which is your os kernel and the file system(ext4? xfs? and the mount info) 2: was the queue running within a container? 3: any other environment would be helpful

@jensanjo

jensanjo commented 2 years ago

Thanks for looking into this.

  1. The system is running mingw64 under Windows 10 Enterprise. File system: ntfs
  2. Not running within a container
  3. I have not tried to reproduce in another environment

I hope this helps!

Kryniowesegryderiusz commented 1 year ago

Is there anything more on that? Having same error

jensanjo commented 1 year ago

I switched to the SQLite queue. It looks like hat got rid of he issue.

chintal commented 1 year ago

It would be useful to know at what point the exceptions are raised (at queue creation / put / get) and to know what the exception will be. I was using pqueue which had the same issue, and I was able to work around it by catching pickle.UnpicklingError on pqueue.get_nowait(). In that case, I did not mind losing some data. It is certainly not ideal, but the data is lost by that point anyway. The exception otherwise requires manual intervention for the program to even continue operating with future data points. This manual intervention is not possible for my application.

chintal commented 1 year ago

I'm using the following wrapper class to avoid this problem. If anyone wishes to use this, you can. Note that the instantiation signature changes path from a positional arg to a keyword arg.

Note that this is exceedingly dangerous code. Not only does it assume that data loss in the queue is acceptable, it also runs shutil.rmtree without any real sanity check on what is essentially user provided input. Make sure that the path and tempdir you provide are completely disposable. Make a mistake there and you could end up accidentally wiping your entire filesystem.

import os
import shutil

class TolerantQueue(object):
    def __init__(self, name=None, logger=None, **kwargs):
        self._name = name or 'unspecified'
        self._logger = logger
        self._path = kwargs.pop('path')
        self._tempdir = kwargs.pop('tempdir', None)
        self._kwargs = kwargs
        self._create()

    def _create(self):
        if not os.path.exists(self._path):
            os.makedirs(self._path)
        if not os.path.exists(self._tempdir):
            os.makedirs(self._tempdir)
        try:
            self._actual_queue = Queue(path=self._path, tempdir=self._tempdir, **self._kwargs)
        except pickle.UnpicklingError as e:
            # info file is truncated
            if self._logger:
                self._logger.warn(f"Unpickling error ({e}) opening persisted queue "
                                  f"{self._name}. Nuking. There may be data loss.")
            self._reset()

    def _reset(self):
        self._actual_queue = None
        if self._path:
            shutil.rmtree(self._path, ignore_errors=True)
        if self._tempdir:
            shutil.rmtree(self._tempdir, ignore_errors=True)
        self._create()

    def get(self, *args, **kwargs):
        try:
            return self._actual_queue.get(*args, **kwargs)
        except (EOFError, pickle.UnpicklingError) as e:
            # q00000 like file is truncated or similar
            if self._logger:
                self._logger.warn(f"Unpickling error ({e}) reading persisted queue "
                                  f"{self._name}. Nuking. There may be data loss.")
            self._reset()
            return None

    def __getattr__(self, item):
        return getattr(self._actual_queue, item)