gorakhargosh / watchdog

Python library and shell utilities to monitor filesystem events.
http://packages.python.org/watchdog/
Apache License 2.0
6.63k stars 698 forks source link

Polling Observer crashes on network loss. #452

Open AlexEshoo opened 6 years ago

AlexEshoo commented 6 years ago

I am using the PollingObserver class to monitor changes to a windows share drive location, however I need it to be reliable enough to ride through network loss events. Currently the polling thread crashes when a network loss event occurs and does not recover from it.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:\Program Files (x86)\Python36-32\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "C:\repo\app\env\lib\site-packages\watchdog\observers\api.py", line 146, in run
    self.queue_events(self.timeout)
  File "C:\repo\app\env\lib\site-packages\watchdog\observers\polling.py", line 91, in queue_events
    new_snapshot = self._take_snapshot()
  File "C:\repo\app\env\lib\site-packages\watchdog\observers\polling.py", line 74, in <lambda>
    self.watch.path, self.watch.is_recursive, stat=stat, listdir=listdir)
  File "C:\repo\app\env\lib\site-packages\watchdog\utils\dirsnapshot.py", line 207, in __init__
    st = stat(path)
OSError: [WinError 64] The specified network name is no longer available: '\\\\10.51.130.132\\testShare'

Additionally checking observer.is_alive() returns True since that thread is still running, so I can't even restart it at the application layer since I can't check if it died properly.

The observer should be able to ride through this event and continue attempting connection, or the observer thread should stop and raise some exception that can be caught and acted on.

QuantumEnergyE commented 6 years ago

Work normally after NFS or CIFS recovery from exception. #459

QuantumEnergyE commented 6 years ago

You can rewrite the PollingObserver , catch the exception WinError 64 in the queue_events function, and try it until the service is restored.

class NetPollingObserver(BaseObserver):
    """
    Platform-independent observer that polls a directory to detect file
    system changes.
    """

    def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
        BaseObserver.__init__(
            self, emitter_class=NetPollingEmitter, timeout=timeout)

class NetPollingEmitter(PollingEmitter):
    """
    Platform-independent emitter that polls a directory to detect file
    system changes.
    """

    def __init__(self,
                 event_queue,
                 watch,
                 timeout=DEFAULT_EMITTER_TIMEOUT,
                 stat=default_stat,
                 listdir=os.listdir):
        EventEmitter.__init__(self, event_queue, watch, timeout)
        self._snapshot = None
        self._lock = threading.Lock()
        self._take_snapshot = lambda: NetDirectorySnapshot(
            self.watch.path, self.watch.is_recursive, stat=stat, listdir=listdir)

    def queue_events(self, timeout):
        # We don't want to hit the disk continuously.
        # timeout behaves like an interval for polling emitters.
        if self.stopped_event.wait(timeout):
            return

        with self._lock:
            if not self.should_keep_running():
                return

            # Get event diff between fresh snapshot and previous snapshot.
            # Update snapshot.
            try:
                new_snapshot = self._take_snapshot()
            except OSError as ex:
                if ex.errno == 64:
                    return
                else:
                    raise
            events = DirectorySnapshotDiff(self._snapshot, new_snapshot)
            self._snapshot = new_snapshot

            # Files.
            for src_path in events.files_deleted:
                self.queue_event(FileDeletedEvent(src_path))
            for src_path in events.files_modified:
                self.queue_event(FileModifiedEvent(src_path))
            for src_path in events.files_created:
                self.queue_event(FileCreatedEvent(src_path))
            for src_path, dest_path in events.files_moved:
                self.queue_event(FileMovedEvent(src_path, dest_path))

            # Directories.
            for src_path in events.dirs_deleted:
                self.queue_event(DirDeletedEvent(src_path))
            for src_path in events.dirs_modified:
                self.queue_event(DirModifiedEvent(src_path))
            for src_path in events.dirs_created:
                self.queue_event(DirCreatedEvent(src_path))
            for src_path, dest_path in events.dirs_moved:
                self.queue_event(DirMovedEvent(src_path, dest_path))

class NetDirectorySnapshot(DirectorySnapshot):
    """
    A snapshot of stat information of files in a directory.

    :param path:
        The directory path for which a snapshot should be taken.
    :type path:
        ``str``
    :param recursive:
        ``True`` if the entire directory tree should be included in the
        snapshot; ``False`` otherwise.
    :type recursive:
        ``bool``
    :param walker_callback:
        .. deprecated:: 0.7.2
    :param stat:
        Use custom stat function that returns a stat structure for path.
        Currently only st_dev, st_ino, st_mode and st_mtime are needed.

        A function with the signature ``walker_callback(path, stat_info)``
        which will be called for every entry in the directory tree.
    :param listdir:
        Use custom listdir function. See ``os.listdir`` for details.
        """

    def __init__(self,
                 path,
                 recursive=True,
                 walker_callback=(lambda p, s: None),
                 stat=default_stat,
                 listdir=os.listdir):
        self._stat_info = {}
        self._inode_to_path = {}

        st = stat(path)
        self._stat_info[path] = st
        self._inode_to_path[(st.st_ino, st.st_dev)] = path

        def walk(root):
            try:
                paths = [os.path.join(root, name) for name in listdir(root)]
            except OSError as e:
                # Directory may have been deleted between finding it in the directory
                # list of its parent and trying to delete its contents. If this
                # happens we treat it as empty.
                raise
            entries = []
            for p in paths:
                try:
                    entries.append((p, stat(p)))
                except OSError:
                    raise
            for _ in entries:
                yield _
            if recursive:
                for path, st in entries:
                    if S_ISDIR(st.st_mode):
                        for _ in walk(path):
                            yield _

        for p, st in walk(path):
            i = (st.st_ino, st.st_dev)
            self._inode_to_path[i] = p
            self._stat_info[p] = st
            walker_callback(p, st)