samuelcolvin / watchfiles

Simple, modern and fast file watching and code reload in Python.
https://watchfiles.helpmanual.io
MIT License
1.79k stars 108 forks source link

Fix data loss issue by ensuring proper locking and clearing of changes #314

Open koseki2580 opened 1 day ago

koseki2580 commented 1 day ago

Summary

This pull request addresses a data loss issue by optimizing the timing of lock management. Specifically, it ensures that data retrieval and clearing operations are performed within the same lock scope, preventing data loss caused by concurrent modifications.

Changes

Modified lock management to ensure data is cleared within the same lock scope immediately after retrieval. Previously, the lock was released after retrieving data, allowing other threads to modify the data before it was cleared. This caused data loss.

Problem Description

The following code retrieves data from changes after acquiring a lock, releases the lock, and then reacquires the lock to clear the data. This creates a window where other threads can add new events to changes, resulting in data loss.

let py_changes = slf
          .borrow()
          .changes
          .lock()
          .unwrap()
          .to_owned()
          .into_pyobject(py)?
          .into_any()
          .unbind();
slf.borrow().clear();
impl RustNotify {
    fn clear(&self) {
        self.changes.lock().unwrap().clear();
    }
}

As a result, any new events added to changes during this gap are not retrieved correctly, leading to data loss.

Reproduction Steps

Add Logging

Insert logging statements at the following points to visualize the issue:

// use custom log
use chrono::Local;
fn log_with_timestamp(message: &str) {
    let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S%.6f").to_string();
    eprintln!("[{}] {}", timestamp, message);
}

log_with_timestamp("detect change");
changes_clone.lock().unwrap().insert((change, path)); // 178
log_with_timestamp("add changes log");
log_with_timestamp("before get changes (base)");
let py_changes = slf // 331
    .borrow()
    .changes
    .lock()
    .unwrap()
    .to_owned()
    .into_pyobject(py)?
    .into_any()
    .unbind();
log_with_timestamp("after get changes (base)");
log_with_timestamp("before clear (base)");
slf.borrow().clear(); // 340
log_with_timestamp("after clear (base)");

Logs Observed

When running the application in a test environment, the following logs indicate the problem.

[2024-12-01 18:55:28.420324] after get changes (base)
[2024-12-01 18:55:28.420329] add changes log
[2024-12-01 18:55:28.420405] detect change
[2024-12-01 18:55:28.420432] add changes log
[2024-12-01 18:55:28.420372] before clear (base)

[2024-12-01 18:55:45.140948] after get changes (base)
[2024-12-01 18:55:45.140972] detect change
[2024-12-01 18:55:45.141018] add changes log
[2024-12-01 18:55:45.141045] detect change
[2024-12-01 18:55:45.141072] add changes log
[2024-12-01 18:55:45.140976] before clear (base)

These logs demonstrate that:

  1. Data is retrieved from changes (to be passed to Python).
  2. New events are added to changes.
  3. changes is cleared.

This order of operations leads to data loss as new events are not included in the cleared data.

File Creation Script
# create_file.py
import argparse
import os
import shutil
import time
from pathlib import Path

def create_file(create_time_s: int):
    # Create a file
    basepath = Path(__file__).parent
    output_path = basepath / 'output'
    shutil.rmtree(output_path, ignore_errors=True)

    output_path.mkdir(exist_ok=True)
    end_time = time.time() + create_time_s
    cnt = 0
    while time.time() < end_time:
        file_path = output_path / f'{cnt}.txt'
        with open(file_path, 'w') as f:
            f.write('Hello, World!')
            f.flush()
        cnt += 1
        os.remove(file_path)
        # time.sleep(0.001)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Create files')
    parser.add_argument('-t', '--time', type=int, default=1, help='Create time in seconds')
    args = parser.parse_args()
    create_file(args.time)
Script Using watchfiles to Monitor Changes
# check_dir.py
import shutil
from pathlib import Path

from watchfiles import watch

def check_dir():
    basepath = Path(__file__).parent
    check_path = basepath / 'output'
    shutil.rmtree(check_path, ignore_errors=True)

    check_path.mkdir(exist_ok=True)
    output_file = basepath / 'detect_output_files.txt'
    output_file.unlink(missing_ok=True)
    with open(output_file, 'a') as f:
        for changes in watch(str(check_path), debounce=100):
            for change in changes:
                f.write(f'{change[1]}\n')

if __name__ == '__main__':
    check_dir()
Subprocess Script to Run Logging
# subprocess.py
import subprocess
from pathlib import Path

basepath = Path(__file__).parent
log_path = basepath / 'watch.log'

def run_subprocess():
    # Run a subprocess
    run_path = basepath / 'check_dir.py'
    with open(log_path, 'w') as f:
        p = subprocess.Popen(['python', str(run_path)], stdout=f, stderr=subprocess.STDOUT)
        print(f'subprocess pid: {p.pid}')
        p.wait()

if __name__ == '__main__':
    run_subprocess()
Script to Extract Specific Log Entries
import os
import re
from pathlib import Path

read_files = ['watch_base.log', 'watch_fix.log']

basepath = Path(__file__).parent

def extract_between_patterns(file_path):
    get_changes_pattern = r'\[.*\] after get changes \(.*\)'
    clear_pattern = r'\[.*\] before clear \(.*\)'

    result = []

    is_write = False

    output_file_prefix = 'result_'
    filename = os.path.basename(file_path)
    output_file = os.path.dirname(file_path) + '/' + output_file_prefix + filename
    with open(file_path, 'r') as file:
        for line in file:
            if re.search(get_changes_pattern, line):
                is_write = True
            if re.search(clear_pattern, line):
                is_write = False
                if len(result) <= 2:
                    result = []
                    continue
                with open(output_file, 'a') as f:
                    for r in result:
                        f.write(r)
                    f.write(line)
                    f.write('\n')
                    result = []
            if is_write:
                result.append(line)

for file in read_files:
    extract_between_patterns(basepath / file)

Fixed Implementation

The fix ensures that data retrieval and clearing are performed under the same lock:

let py_changes = {
    let borrowed = slf.borrow();
    let mut locked_changes = borrowed.changes.lock().unwrap();
    let py_changes = locked_changes.to_owned().into_pyobject(py)?.into_any().unbind();
    // Clear the changes while holding the lock
    locked_changes.clear();
    py_changes
};
Ok(py_changes)

This approach eliminates the data loss issue by ensuring no other thread can modify changes during the operation.