protocol / beyond-bitswap

Other
34 stars 9 forks source link

Bitswap test: Fetching files in parallel #45

Closed dgrisham closed 3 years ago

dgrisham commented 3 years ago

NOTE: I left the first few paragraphs as background, but the UPDATE section below describes the core issue more directly.

I'm running a custom Bitswap data exchange test with 3 nodes in a triangle topology that I'm calling trade, adapted from the transfer test in this repo. Each node uploads a unique file, then downloads each of their peers' file (so, each user shares one file with each of their peers, and downloads a file from each peer). This means a total of 6 file exchanges happens throughout a given run.

One difference from the transfer test is that, when a user downloads a file from one of their peers, I don't have the user save that file on-disk. Once all of the files are exchanged, each user drops the files they fetched from their peers and disconnect. At the start of each run, the peers (re)connect to one another.

When trying to set the run count > 1 for this test, a run after the first (usually the second, at least with large enough files being exchanged) will timeout. This usually presents as 5 of the 6 files being exchanged, while the 6th hangs and never finishes. The pair of peers that fails seems to vary. The strange thing about this is that the file had to have already been sent from the user hosting that file to one of the peers, since 5/6 succeed, but for some reason they never finish sending to the other peer. I set the long_lasting test parameter to true to get more of a sense of what might be happening. Every peer seems to remain alive, but eventually no data is flowing between them.

I tried making it so that the users all clean out the file they are responsible for sharing at the end of each run and re-add that file to the Bitswap datastore (then the peer's re-exchange file CIDs from there), but that doesn't seem to have an affect. I can get more than 2 runs to succeed if I make the filesize small enough. Sometimes it'll fail at a later run. For my purposes, sharing somewhat large files is useful, so I'd rather figure out a better solution if possible.

I've come at this from various angles -- ensuring that all of the nodes are syncing at that right steps, clearing out nearly all of the relevant data at the end of each run and reinitializing, making sure the nodes are alive and that they all dial each other before they attempt to exchange data, making the pubsub channel that each node uses to publish its file CIDs unique per-run (which is sort of moot because the CIDs don't change, but I did this so I could also re-add the file to Bitswap since that happens in the same test utility function), etc. This also happens whether I run the tests locally or in Docker.

One thing worth noting (that I just considered while writing this): I am attempting to fetch these files in parallel on each node. Maybe I'm hitting some sort of deadlock with that? The call to transferNode.Fetch is the call that hangs and each node calls that function from one goroutine for each file they want to fetch, so perhaps it's something about trying to fetch the same file concurrently. I haven't attempted to replicate this issue in the transfer test, which doesn't fetch files concurrently, so perhaps that would be a good route to go.

UPDATE: That seems to be the issue -- the transfer test works fine with 50 runs in a row. However, my test fails, and I believe calling transferNode.Fetch to download multiple files in parallel is the issue. I'm guessing that I'm hitting some sort of deadlock in that function, or a weird race condition that only sometimes arises, and for some reason never happens in the first run (at least as far as I've seen).

So my question (sorry for the noise, wanted to leave everything for documentation/in case my understanding is incorrect): Is there a way to easily fetch multiple files at once with a Bitswap node in a test? Should I modify the Bitswap test node's implementation of the Fetch() function? For convenience, the Fetch() function is currently implemented here.

Pinging everyone who seems to have contributed to the transfer test (@hannahhoward @adlrocha @daviddias @aarshkshah1992), as well as @Stebalien

dgrisham commented 3 years ago

Ah I think I figured it out, adapted from daghelpers.go in go-ipld-fomat:

func (n *BitswapNode) FetchAll(ctx context.Context, cids []cid.Cid, _ []PeerInfo) ([]files.Node, []error) {
    for _, c := range cids {
        err := merkledag.FetchGraph(ctx, c, n.dserv)
        if err != nil {
            return nil, []error{err}
        }
    }

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    nodechan := n.dserv.GetMany(ctx, cids)

    fNodes := make([]files.Node, len(cids))
    errs := make([]error, len(cids))

    for count := 0; count < len(cids); {
        select {
        case opt, ok := <-nodechan:
            if !ok {
                for i := range errs {
                    errs[i] = errors.New("NodeChan empty")
                }
                return nil, errs
            }

            if opt.Err != nil {
                errs[count] = opt.Err
                return nil, errs
            }

            nd := opt.Node
            c := nd.Cid()
            for i, lnk_c := range cids {
                if c.Equals(lnk_c) {
                    count++
                    unixF, err := unixfile.NewUnixfsFile(ctx, n.dserv, nd)
                    if err != nil {
                        errs[i] = err
                        continue
                    }
                    fNodes[i] = unixF
                }
            }
        case <-ctx.Done():
            return fNodes, nil
        }
    }

    return fNodes, errs
}
dgrisham commented 3 years ago

One more note -- I did have to clear the published files out of my Bitswap node and re-publish for every run. Relatively small price to pay of course, just sifting through the details of what all is necessary for this type of thing.