automerge / automerge-repo-swift

Extends the Automerge-swift library, providing support for working with multiple Automerge documents at once, with pluggable network and storage providers.
https://swiftpackageindex.com/automerge/automerge-repo-swift/documentation/automergerepo
MIT License
16 stars 2 forks source link

Document added to repo doesn't seem to get state from web socket peer until it sends state of own #106

Closed jessegrosjean closed 1 day ago

jessegrosjean commented 1 month ago

To reproduce the error I'm using CloudCount and:

  1. Create new document.
  2. Copy new document to another computer
  3. Check the "Automerge Repo" checkbox on one computer and increment count
  4. At this point computer one and the repo's web socket peer state should match
  5. Now on computer 2 open the document copy and check the "Automerge Repo" checkbox

At this point I would expect that second document's state to get synced with the web socket peer, but this isn't happening. To see the latest state from the peer I need to first modify the document... only then does syncing start to work.

luxmentis commented 4 days ago

I'm seeing what I think is a variant of the same problem.

In my app, starting with an empty local repo but documents at sync.automerge.org (also a local file system storage provider but I don't think that's relevant):

This quick mod is working for me so far (certainly not proposing this as the fix – I'm not familiar enough with the code for that):

diff --git a/Sources/AutomergeRepo/Repo.swift b/Sources/AutomergeRepo/Repo.swift
index 933621b..0d3b405 100644
--- a/Sources/AutomergeRepo/Repo.swift
+++ b/Sources/AutomergeRepo/Repo.swift
@@ -357,6 +357,7 @@ public final class Repo {
             // sync changes should be invoked as rapidly as possible to allow for best possible
             // collaborative editing experiences.
             let handleObserver = doc.objectWillChange
+                .prepend(())
                 .sink { [weak self] _ in
                     guard let self else { return }
                     self.saveSignalPublisher.send(id)
heckj commented 3 days ago

Nice sleuthing @luxmentis, thank you.

heckj commented 3 days ago

I jumped back in and read around on this, but I'm not yet confident on the sequence of calls to the repo that's leading to the failure scenario - and if these two patterns are identical. Can you help by giving me a little more detail in terms of the state of the two repositories, and what specific calls have been invoked on either side, and in which order?

In the detail from @luxmentis, I think what I'm hearing is:

Is that correct @luxmentis? I want to be super clear on the ordering of creation, connecting (and/or disconnecting), and invocations of find to really understand this.

heckj commented 3 days ago

@jessegrosjean For the cloudCount scenario, could you point out what's happening when "you click the "Automerge Repo" checkbox?" What's that doing, and in what order in CloudCount? I got a bit lost reading through the additional layers in your example app space.

Are there two repos in play here "A" and "B", both on local machines, or three "A", B", and "C" - the first two being macs and the last being an instance of the hosted automerge-repo in Javascript (sync.automerge.org or such?) I first read it as possible just two macs talking over Bonjour, but then I saw "websocket" and thought perhaps there's a third in here, and this is an issue with relaying updates through the server.

luxmentis commented 3 days ago

@heckj Hmm, actually let's ignore my "first doc works, second doc doesn't" statement for the moment (which was partly about the order of things in my client code; explanation below).

Now I wonder if I'm misunderstanding the API. Here's a simple example:

import XCTest
@testable import AutomergeRepo
import Combine

final class SyncTest: XCTestCase {
    private var subs = [AnyCancellable]()

    func testExample() async throws {
        let webSocketProvider = WebSocketProvider(WebSocketProviderConfiguration(reconnectOnError: true, loggingAt: .errorOnly))
        let repo = await Repo(sharePolicy: .agreeable, storage: InMemoryStorage(), networks: [webSocketProvider])
        try await webSocketProvider.connect(to: URL(string: "wss://sync.automerge.org/")!)

        // a document with this id exists
        try await syncAndDisplayHistory(id: "abFmaStDm3D7Yqh57ULbG5fwSoJ", repo: repo)

        try? await Task.sleep(for: .seconds(60))
    }

    func syncAndDisplayHistory(id: String, repo: Repo) async throws {
        let handle = try await repo.find(id: DocumentId(id)!)

        handle.doc.objectWillChange.prepend(()).receive(on: DispatchQueue.main).sink {
            print("\(id) history count: \(handle.doc.getHistory().count)")
        }
        .store(in: &subs)
    }
}

This creates an empty local doc but never fetches from the server (i.e. doc.getHistory().count stays at 0). With the Repo mod from my earlier message, it does fetch the remote content.

Is there something other than find() I'm supposed to do? Or is this a bug?

Part 2; possibly tangential

In the original case I mentioned, the first fetch worked because I happened to be calling fetch() before webSocketProvider.connect(). As mentioned earlier, Repo's peer init code triggered a remote sync. Attempts to fetch subsequent document(s) were after the connect, so didn't benefit from that code. But as I say, perhaps the overall problem is I'm not getting the API.

heckj commented 2 days ago

@luxmentis I took a bit of time and morphed your example test into one that would fit into the integration tests. Because they use a local docker instance of automerge-repo for hosting the WebSocket for their tests, it got updated and broken a up.

I think the reason the prepend() is needed in your system is a slight missed expectation. As soon as you get the DocHandle back from the await repo.find(), it should be the latest version that the server had - and it's history (the list of ChangeHash) at that moment should be what you're expecting. That's what the first test - https://github.com/automerge/automerge-repo-swift/blob/main/IntegrationTests/Tests/IntegrationTestsTests/RepoAndTwoClientWebsocketIntegrationTests.swift#L102-L120 - illustrates.

The objectWillChange publisher won't trigger until a new change comes in over the WebSocket connection. Take a look at the test at https://github.com/automerge/automerge-repo-swift/blob/main/IntegrationTests/Tests/IntegrationTestsTests/RepoAndTwoClientWebsocketIntegrationTests.swift#L122-L159, and you'll see the loosely same flow as your code is testing - but this objectWillChange.sink() setup is verifying that it receives an input on that published when the document is updated.

(FWIW, the "expectation/fulfills setup in the tests can let them operate a TON faster than dropping the Task.sleep(30) kiund of setups in there. It's a fair bit of pomp and circumstance to set them up, but they work nicely.) Worst part of these testing scenarios is that the automerge-repo sync protocol doesn't (yet? - maybe down the road) return a "yeah, you're all synced up" confirmation response when you send a sync message, so it's unavoidable in some of the testing scenarios.

Take a look at the examples in the integration tests, feel free to run them yourself (you'll need to update them to use the public sync server, or run a local instance with Docker - there's a script in the repo for that though - ./scripts/interop.sh). See if that makes a bit more sense.

As far as I'm concerned, there's no issue with you keeping the prepend() in your publisher if you're counting on that trigger to establish and trigger other business logic in your app, but it might be easier if you knew the history was up to date and ready to be used immediately on return from the await repo.find() call.

luxmentis commented 2 days ago

FWIW, the "expectation/fulfills setup in the tests can let them operate a TON faster than dropping the Task.sleep(30) kiund of setups in there

Oh, for sure. Abusing the test format was just a quick and dirty way for me to get you some runnable code. Not intended as an actual test.

Thanks for the integration test, which I've run and seen pass. I think we've ended up talking at cross-purposes a bit because my example code contained a publisher, in addition to the publisher in the original git diff I posted. Here's a simplified version to make communication easier. I'd have patched your integration test, but there's no StorageProvider implementation available. But this will run if you drop it (temporarily!) into the package's main test folder.

func testRepoWithStorage() async throws {
    let repo = Repo(sharePolicy: .agreeable, storage: InMemoryStorage())
    let websocket = WebSocketProvider()
    await repo.addNetworkAdapter(adapter: websocket)
    try await websocket.connect(to: URL(string: "wss://sync.automerge.org/")!)

    let handle = try await repo.find(id: DocumentId("abFmaStDm3D7Yqh57ULbG5fwSoJ")!)

    // This fails: history is empty at this point. But it would succeed if we weren't using the storage.
    XCTAssertFalse(handle.doc.getHistory().isEmpty)
}

So it turns out behaviour is different when the repo has storage. In this example, history is empty after the find() and the test fails. But if you use a repo without storage it becomes equivalent to your integration test and it passes.

The empty history problem isn't specific to the use of InMemoryStorage – my app uses a custom FileSystemStorage implementation and behaves the same, which is what led me here.

Hope that's clearer!

heckj commented 2 days ago

Oh perfect, that's super useful! I'd not fully tested and iterated on the whole end to end interactions with storage providers other than the simple bits with the InMemory stuff in unit tests. This'll be perfect to dig into! Thanks for the code above, and for the detail that it's specific to when there IS a storage provider. That should help narrow it down and we'll get this sorted in some fashion.

luxmentis commented 2 days ago

Sounds good. My emergency fix works for me for now.

By the way, pretty neat sitting with my daughter doing some collaborative editing and watching each others' changes come up!

heckj commented 2 days ago

I've got the repro set up - https://github.com/automerge/automerge-repo-swift/pull/113

Not only is the history not set and available (and hence, I'm sure the document contents aren't quite proper either), but the change notifications are failing when there's a storage provider attached to the repo that invoked the find() to retrieve things.

jessegrosjean commented 1 day ago

> @jessegrosjean For the cloudCount scenario, could you point out what's happening when "you click the "Automerge Repo" checkbox?"

That ends up calling repo.import like this, from within SharingService.share

let handle = try await repo.import(handle: .init(
    id: store.id,
    doc: store.automerge
))

But I agree, lots of things happening in that code, and at the moment I'm working on another approach. I've tried to reproduce the problem in a simpler example. I'm not entirely sure this is all the same setup This isn't the same setup, but I'm trying to simulate what I think is the setup in CloudCount... but in a single app instead of same app running on two computers.

In this code I would expect documentA and documentB to converge, but that doesn't seem to be happening.

let commonServerURL = URL(string: "wss://sync.automerge.org/")!
let commonDocumnetId = DocumentId()
let commonDocumentStartData = Automerge.Document().save()

let repoA = Repo(sharePolicy: SharePolicy.agreeable)
let websocketA = WebSocketProvider(.init(reconnectOnError: true, loggingAt: .tracing))
repoA.addNetworkAdapter(adapter: websocketA)
try await websocketA.connect(to: commonServerURL)
let documentA = try Automerge.Document(commonDocumentStartData)
try documentA.put(obj: .ROOT, key: "test", value: .String("some value"))
_ = try await repoA.import(handle: .init(id: commonDocumnetId, doc: documentA))

let repoB = Repo(sharePolicy: SharePolicy.agreeable)
let websocketB = WebSocketProvider(.init(reconnectOnError: true, loggingAt: .tracing))
repoB.addNetworkAdapter(adapter: websocketB)
try await websocketB.connect(to: commonServerURL)
let documentB = try Automerge.Document(commonDocumentStartData)
_ = try await repoB.import(handle: .init(id: commonDocumnetId, doc: documentB))

// this continues forever, heads never match
while documentA.headsKey != documentB.headsKey {
    print("waiting for sync...")
    assert((try! documentA.get(obj: .ROOT, key: "test")) != nil)
    assert((try! documentB.get(obj: .ROOT, key: "test")) == nil)
    try await Task.sleep(nanoseconds: 1_000_000_000)
}

print("Sync success if reach this point!")
heckj commented 1 day ago

Yeah, slightly different pattern - there's definitely an issue with sync and find() when there's a storage provider added. I'll add a test along these lines into Integration test so we can sort it.

heckj commented 1 day ago

Question for you @jessegrosjean - do have a local extension or something for the example above? The compiler had a little fit for documentA.headsKey - and I wasn't sure how you were setting up that property. A hash or something of the [ChangeHash] from .history()?

jessegrosjean commented 1 day ago

Question for you @jessegrosjean - do have a local extension or something for the example above?

Oops, yes. Just remove headsKey part and compare heads normally I think.

heckj commented 1 day ago

Well, one of the issues is found anyway. On import (or create) there wasn't anything that was pro-actively kicking off an initial sync. It was only when the document was changed that it would be "written" out to any connected peers. In the PR, I've extracted the logic to request a sync into it's own private method and explicitly called it from create(), import(), and clone() after it's been fully resolved locally.

I also went ahead and marked import() with discardableResult, given the way it was used in the example, it seems to make sense in retrospect there.

That solves Jesse's initial issue, but we still have the issue with sync failing to operate as expected when there's a storage adapter in the mix, which I'm still working on tracing & resolving.

heckj commented 1 day ago

I think I have the fix in place. Need to sort out unit tests to matches updated behavior, but it's pretty close. Should have a fix out later today.

heckj commented 1 day ago

@luxmentis The underlying issue - now fixed in the main branch - was that I had mismatched logic in the storage adapter interface. One side always returned a new Document, even if it wasn't available - which caused the resolver logic to never attempt to sync with other peers. That's been fixed, as well as documents are no proactively synced with all peers on creation, a change in behaviour from earlier setup.

luxmentis commented 12 hours ago

@heckj Looking good here. I'm currently chasing some other weirdness, but I don't think it's related.