automerge / automerge-repo-rs

MIT License
38 stars 6 forks source link

My repository only accepts 10 changes #70

Open pierreprinetti opened 4 months ago

pierreprinetti commented 4 months ago

I am trying to build a Taskwarrior clone with Automerge, Automerge-repo and Autosurgeon.

However, I can only do so many changes to my repository, until it starts appearing frozen. I don't know whether the problem is the insertion, or the listing, and I get no errors in either operation.

The reproducer

I have tried to remove as much as possible of my own logic, and only leave in the glue code and a minimal reproducer.

This program opens (or creates) an automerge repository in ./data, adds one item, and prints a list of all items.

The first 10 executions work as intended. From the 11th on, the list stays at 10 items.

rm -rf ./data
cargo build
for i in {1..11}; do
    ./target/debug/tsk
done

The full code is here: https://codeberg.org/pierreprinetti/tsk/src/branch/debug

and here below I paste the contents of main.src for completeness.

use anyhow::Result;
use automerge_repo::{DocumentId, StorageError};
use autosurgeon::{hydrate, reconcile};
use futures::future::BoxFuture;
use std::path::PathBuf;
use task::{Task, Tasks};

mod task {
    use autosurgeon::{Hydrate, Reconcile};
    use std::fmt;

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Annotation {
        created: String,
        text: String,
    }

    impl fmt::Display for Annotation {
        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
            write!(f, "{}", self.text)
        }
    }

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Task {
        pub description: String,
    }

    impl Task {
        pub fn new(name: &str) -> Self {
            return Task {
                description: name.to_string(),
            };
        }
    }

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Tasks {
        pub tasks: Vec<Task>,
    }
}

struct Fs {
    fs_storage: automerge_repo::fs_store::FsStore,
}

impl Fs {
    pub fn new(p: std::path::PathBuf) -> Self {
        let fs_storage =
            automerge_repo::fs_store::FsStore::open(p).expect("could not open the storage");
        Fs { fs_storage }
    }
}

impl automerge_repo::Storage for Fs {
    fn get(&self, id: DocumentId) -> BoxFuture<'static, Result<Option<Vec<u8>>, StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage
                .get(&id)
                .map_err(move |_| StorageError::Error),
        ))
    }

    fn list_all(&self) -> BoxFuture<'static, Result<Vec<DocumentId>, StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage.list().map_err(move |_| StorageError::Error),
        ))
    }

    fn append(
        &self,
        id: DocumentId,
        changes: Vec<u8>,
    ) -> BoxFuture<'static, Result<(), StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage
                .append(&id, &changes)
                .map_err(move |_| StorageError::Error),
        ))
    }

    fn compact(&self, _: DocumentId, _: Vec<u8>) -> BoxFuture<'static, Result<(), StorageError>> {
        Box::pin(futures::future::ready(Ok(())))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    println!("--- start ---");
    tracing_subscriber::fmt::init();
    let repo_handle = {
        let data_dir = Fs::new(PathBuf::from("./data"));
        let repo = automerge_repo::Repo::new(None, Box::new(data_dir));
        repo.run()
    };
    let doc_handle = {
        let docs = repo_handle
            .list_all()
            .await
            .expect("failed to list documents");
        if docs.is_empty() {
            let d = repo_handle.new_document();
            let tasks = Tasks { tasks: Vec::new() };
            d.with_doc_mut(|doc| {
                let mut tx = doc.transaction();
                reconcile(&mut tx, &tasks).unwrap();
                tx.commit();
            });
            d
        } else {
            let doc_id = &docs[0];
            repo_handle
                .load(doc_id.clone())
                .await
                .expect("failed to open the document")
                .unwrap()
        }
    };

    let t = Task::new(&format!("New item"));
    doc_handle.with_doc_mut(|doc| {
        let mut tasks: Tasks = hydrate(doc).unwrap();
        tasks.tasks.push(t);
        let mut tx = doc.transaction();
        reconcile(&mut tx, &tasks).unwrap();
        tx.commit();
    });

    doc_handle.with_doc(|doc| {
        let tasks: Tasks = hydrate(doc).unwrap();
        for (i, task) in tasks.tasks.iter().enumerate() {
            println!("{} {}", i + 1, task.description);
        }
    });

    repo_handle
        .stop()
        .expect("failed to cleanly close the repository");

    Ok(())
}

I hope you can make sense of this...

pierreprinetti commented 4 months ago

Note that if the inserts are made in a loop in a single execution, then the repo can grow normally past 10 items.

pierreprinetti commented 4 months ago

Here's a version of the same program, with 15 insertions in a loop. It works the first time. But running it again really messes things up.

use anyhow::Result;
use automerge_repo::{DocumentId, StorageError};
use autosurgeon::{hydrate, reconcile};
use futures::future::BoxFuture;
use std::path::PathBuf;
use task::{Task, Tasks};

mod task {
    use autosurgeon::{Hydrate, Reconcile};
    use std::fmt;

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Annotation {
        created: String,
        text: String,
    }

    impl fmt::Display for Annotation {
        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
            write!(f, "{}", self.text)
        }
    }

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Task {
        pub description: String,
    }

    impl Task {
        pub fn new(name: &str) -> Self {
            return Task {
                description: name.to_string(),
            };
        }
    }

    #[derive(Default, Debug, Clone, Reconcile, Hydrate, PartialEq)]
    pub struct Tasks {
        pub tasks: Vec<Task>,
    }
}

struct Fs {
    fs_storage: automerge_repo::fs_store::FsStore,
}

impl Fs {
    pub fn new(p: std::path::PathBuf) -> Self {
        let fs_storage =
            automerge_repo::fs_store::FsStore::open(p).expect("could not open the storage");
        Fs { fs_storage }
    }
}

impl automerge_repo::Storage for Fs {
    fn get(&self, id: DocumentId) -> BoxFuture<'static, Result<Option<Vec<u8>>, StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage
                .get(&id)
                .map_err(move |_| StorageError::Error),
        ))
    }

    fn list_all(&self) -> BoxFuture<'static, Result<Vec<DocumentId>, StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage.list().map_err(move |_| StorageError::Error),
        ))
    }

    fn append(
        &self,
        id: DocumentId,
        changes: Vec<u8>,
    ) -> BoxFuture<'static, Result<(), StorageError>> {
        Box::pin(futures::future::ready(
            self.fs_storage
                .append(&id, &changes)
                .map_err(move |_| StorageError::Error),
        ))
    }

    fn compact(&self, _: DocumentId, _: Vec<u8>) -> BoxFuture<'static, Result<(), StorageError>> {
        Box::pin(futures::future::ready(Ok(())))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    println!("--- start ---");
    tracing_subscriber::fmt::init();
    let repo_handle = {
        let data_dir = Fs::new(PathBuf::from("./data"));
        let repo = automerge_repo::Repo::new(None, Box::new(data_dir));
        repo.run()
    };
    let doc_handle = {
        let docs = repo_handle
            .list_all()
            .await
            .expect("failed to list documents");
        if docs.is_empty() {
            let d = repo_handle.new_document();
            let tasks = Tasks { tasks: Vec::new() };
            d.with_doc_mut(|doc| {
                let mut tx = doc.transaction();
                reconcile(&mut tx, &tasks).unwrap();
                tx.commit();
            });
            d
        } else {
            let doc_id = &docs[0];
            repo_handle
                .load(doc_id.clone())
                .await
                .expect("failed to open the document")
                .unwrap()
        }
    };

    for n in 1..=15 {
        let t = Task::new(&format!("Item {}.", n));
        doc_handle.with_doc_mut(|doc| {
            let mut tasks: Tasks = hydrate(doc).unwrap();
            tasks.tasks.push(t);
            let mut tx = doc.transaction();
            reconcile(&mut tx, &tasks).unwrap();
            tx.commit();
        });
    }

    doc_handle.with_doc(|doc| {
        let tasks: Tasks = hydrate(doc).unwrap();
        for (i, task) in tasks.tasks.iter().enumerate() {
            println!("{} {}", i + 1, task.description);
        }
    });

    repo_handle
        .stop()
        .expect("failed to cleanly close the repository");

    Ok(())
}