ipfs / kubo

An IPFS implementation in Go
https://docs.ipfs.tech/how-to/command-line-quick-start/
Other
16.18k stars 3.02k forks source link

GC with cancelled context caused race condition deadlock in pinner #10593

Open LeeTeng2001 opened 1 day ago

LeeTeng2001 commented 1 day ago

Checklist

Installation method

built from source

Version

Kubo version: 0.33.0-dev
Repo version: 16
System version: arm64/linux
Golang version: go1.23.1

Config

{
  "API": {
    "HTTPHeaders": {}
  },
  "Addresses": {
    "API": "/ip4/127.0.0.1/tcp/5001",
    "Announce": [],
    "AppendAnnounce": [],
    "Gateway": "/ip4/127.0.0.1/tcp/8080",
    "NoAnnounce": [],
    "Swarm": [
      "/ip4/0.0.0.0/tcp/4001",
      "/ip6/::/tcp/4001",
      "/ip4/0.0.0.0/udp/4001/webrtc-direct",
      "/ip4/0.0.0.0/udp/4001/quic-v1",
      "/ip4/0.0.0.0/udp/4001/quic-v1/webtransport",
      "/ip6/::/udp/4001/webrtc-direct",
      "/ip6/::/udp/4001/quic-v1",
      "/ip6/::/udp/4001/quic-v1/webtransport"
    ]
  },
  "AutoNAT": {},
  "AutoTLS": {},
  "Bootstrap": [
    "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
    "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
    "/ip4/104.131.131.82/udp/4001/quic-v1/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
    "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
    "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
    "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb"
  ],
  "DNS": {
    "Resolvers": {}
  },
  "Datastore": {
    "BloomFilterSize": 0,
    "GCPeriod": "1h",
    "HashOnRead": false,
    "Spec": {
      "mounts": [
        {
          "child": {
            "path": "blocks",
            "shardFunc": "/repo/flatfs/shard/v1/next-to-last/2",
            "sync": true,
            "type": "flatfs"
          },
          "mountpoint": "/blocks",
          "prefix": "flatfs.datastore",
          "type": "measure"
        },
        {
          "child": {
            "compression": "none",
            "path": "datastore",
            "type": "levelds"
          },
          "mountpoint": "/",
          "prefix": "leveldb.datastore",
          "type": "measure"
        }
      ],
      "type": "mount"
    },
    "StorageGCWatermark": 90,
    "StorageMax": "10GB"
  },
  "Discovery": {
    "MDNS": {
      "Enabled": true
    }
  },
  "Experimental": {
    "FilestoreEnabled": false,
    "Libp2pStreamMounting": false,
    "OptimisticProvide": false,
    "OptimisticProvideJobsPoolSize": 0,
    "P2pHttpProxy": false,
    "StrategicProviding": false,
    "UrlstoreEnabled": false
  },
  "Gateway": {
    "DeserializedResponses": null,
    "DisableHTMLErrors": null,
    "ExposeRoutingAPI": null,
    "HTTPHeaders": {},
    "NoDNSLink": false,
    "NoFetch": false,
    "PublicGateways": null,
    "RootRedirect": ""
  },
  "Identity": {
    "PeerID": "12D3KooWRxQ4bDKJk3ns9WKbBy9mowfUAMJ8HfADPcoH1vCdZKNE"
  },
  "Import": {
    "CidVersion": null,
    "HashFunction": null,
    "UnixFSChunker": null,
    "UnixFSRawLeaves": null
  },
  "Internal": {},
  "Ipns": {
    "RecordLifetime": "",
    "RepublishPeriod": "",
    "ResolveCacheSize": 128
  },
  "Migration": {
    "DownloadSources": [],
    "Keep": ""
  },
  "Mounts": {
    "FuseAllowOther": false,
    "IPFS": "/ipfs",
    "IPNS": "/ipns"
  },
  "Peering": {
    "Peers": null
  },
  "Pinning": {
    "RemoteServices": {}
  },
  "Plugins": {
    "Plugins": null
  },
  "Provider": {
    "Strategy": ""
  },
  "Pubsub": {
    "DisableSigning": false,
    "Router": ""
  },
  "Reprovider": {},
  "Routing": {
    "Methods": null,
    "Routers": null
  },
  "Swarm": {
    "AddrFilters": null,
    "ConnMgr": {},
    "DisableBandwidthMetrics": false,
    "DisableNatPortMap": false,
    "RelayClient": {},
    "RelayService": {},
    "ResourceMgr": {},
    "Transports": {
      "Multiplexers": {},
      "Network": {},
      "Security": {}
    }
  },
  "Version": {}
}

Description

I noticed a deadlock caused by GC, after debugging for a while, I found the problematic part and have a solution for it. To reproduce, download a file with ipfs, next, run a GC with cancelled context, download a new file with ipfs, the file will hang at 100% while waiting for pinner to pin the file.

The issue occurs at race condition at GC.

func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn pin.Pinner, bestEffortRoots []cid.Cid) <-chan Result {
    ctx, cancel := context.WithCancel(ctx)
        ....
        gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots, output)
}

GC will invoke ColoredSet which in turns invoke pinner RecursiveKeys to get a channel. The channel will passed into Descendants to iterate result.

func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffortRoots []cid.Cid, output chan<- Result) (*cid.Set, error) {
    // ...
    rkeys := pn.RecursiveKeys(ctx, false)
    err := Descendants(ctx, getLinks, gcs, rkeys)
}

func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedPin) error {
        // ...
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case wrapper, ok := <-roots:
                        // ...
        }
    }
}

The race condition lies between the interaction between pinner's streamIndex and Descendants


// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
    return p.streamIndex(ctx, p.cidRIndex, detailed)
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
    out := make(chan ipfspinner.StreamedPin)

    go func() {
        defer close(out)

        p.lock.RLock()
        defer p.lock.RUnlock()

        cidSet := cid.NewSet()

        err := index.ForEach(ctx, "", func(key, value string) bool {
                   // ....
        })
        if err != nil {
            out <- ipfspinner.StreamedPin{Err: err}
        }
    }()

    return out
}

When the passed in context is cancelled, pinner's streamIndex goroutine will immediately return an error which it wants to deliver outside via the unbuffered out channel. At the Descendants side, the channel will never be read because the select statement contains the out channel and context, when the context is cancelled, it'll immediately return without ever reading the out channel.

My proposed solution is simple, change the streamIndex channel to a buffered channel

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
    out := make(chan ipfspinner.StreamedPin, 1)

Though, I'm not sure if this will affect other part of infrastructure, please verify my solution validness. Thanks!

LeeTeng2001 commented 1 day ago

An alternative solution is to modify Descendants to consume everything without processing

func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedPin) error {
        // ...
    for {
        select {
        case <-ctx.Done():
                        for range roots {}
            return ctx.Err()
        case wrapper, ok := <-roots:
                        // ...
        }
    }
}