After stopping and restarting a pipeline that uses the filesystem connector with persistency enabled, an error indicating broken integrity occurs in the subscribe method, preventing the pipeline from resuming correctly.
Set up a pipeline with the following:
Set up a pipeline with the following configuration:
Filesystem connector (in streaming mode)
Persistency enabled (using the filesystem backend)
pw.io.subscribe
Start the pipeline and allow it to load part of the directory
Stop and restart the pipeline
Removing persistency directory allows to restart pipeline without error.
Relevant log output
[2024-10-25T08:24:39]:INFO:Preparing Pathway computation
[2024-10-25T08:24:39]:INFO:Removing obsolete metadata block: 1729837369614-0-0. Actual version: 1729837458401
[2024-10-25T08:24:39]:INFO:Merge the current state with block: StoredMetadata { storage_types: {66322552640100721340350317265675216085: FileSystem}, last_advanced_timestamp: At(Timestamp(1729837458654)) }
[2024-10-25T08:24:39]:INFO:Truncate: Shrink "1729837459006" to 127 bytes
[2024-10-25T08:24:39]:INFO:Truncate: Remove Timestamp(1729837459290)
[2024-10-25T08:24:39]:INFO:Truncate: Remove Timestamp(1729837460416)
[2024-10-25T08:24:39]:INFO:Truncate: Remove Timestamp(1729837466755)
[2024-10-25T08:24:39]:INFO:Reached the greater logical time than preserved (1729837458654). Exiting the rewind after reading 1909 entries
[2024-10-25T08:24:39]:INFO:Reached the end of the snapshot. Exiting the rewind after 920 entries
[2024-10-25T08:24:39]:INFO:Reached the greater logical time than preserved (1729837458654). Exiting the rewind after reading 1909 entries
[2024-10-25T08:24:39]:INFO:Seek the data source to the frontier OffsetAntichain { antichain: {Empty: FilePosition { total_entries_read: 920, path: "foo.json", bytes_offset: 6865 }} }
[2024-10-25T08:24:39]:WARNING:seek for snapshot mode may not work correctly in case deletions take place
[2024-10-25T08:24:39]:INFO:FilesystemReader-otel_2024_fs_2: 0 entries (1 minibatch(es)) have been sent to the engine
[2024-10-25T08:24:47]:INFO:FilesystemReader-otel_2024_fs_2: 4181 entries (4186 minibatch(es)) have been sent to the engine
thread 'pathway:work-0' panicked at src/engine/report_error.rs:83:50:
AssertionError:
...
File "pathway/internals/table_subscription.py", line 134, in on_change_wrapper
assert diff in [-1, 1]
^^^^^^^^^^^^^^^
AssertionError
Steps to reproduce
Note: this issue is non-deterministic.
After stopping and restarting a pipeline that uses the filesystem connector with persistency enabled, an error indicating broken integrity occurs in the
subscribe
method, preventing the pipeline from resuming correctly.Set up a pipeline with the following:
pw.io.subscribe
Removing persistency directory allows to restart pipeline without error.
Relevant log output
What did you expect to happen?
Working pipeline after restart.
Version
0.15.2
Docker Versions (if used)
No response
OS
Linux
On which CPU architecture did you run Pathway?
x86-64