ipfs / boxo

A set of reference libraries for building IPFS applications and implementations in Go.
https://github.com/ipfs/boxo#readme
Other
214 stars 96 forks source link

Issue when adding file with a block service initialized with bitswap. `panic: runtime error` #575

Closed GeorgePapageorgakis closed 9 months ago

GeorgePapageorgakis commented 9 months ago

Hello,

I am new in IPFS and I am trying to setup a new boxo ipfs network in my app using bitswap based on the example Downloading a UnixFS file

However, I cannot find any documentation about how am I supposed to get the file from the peer. The example was just a server client with offline.Exchange blockstore. I am at a point where my app crashes when I try to attach a file to DAG in the case where I have already instatiated the blockservice.New.. with BitSwap instead of offline.Exchange(bs).

Here are some logs and code.

Here is the peer from whom I send the file using the "offline.Exchange(bs)"

19:23:29.937 GoLog   E  2024-02-02T17:23:29.936Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:82  IPFS BitSwap Service Running...
19:25:06.545 GoLog   E  2024-02-02T17:25:06.542Z    DEBUG   rendezvouz  networkmodule/chat.go:166   sending through streams 3029977 Bytes
19:25:06.545 GoLog   E  2024-02-02T17:25:06.543Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:122 AddFileToIPFSNetwork...
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:146 AddFileToIPFSNetwork done, cid: bafybeichui6s6lu26uragxswp43oekfzc5urfijehd3dwvfq3zlo7fw46e
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    DEBUG   rendezvouz  networkmodule/networkmodule.go:101  sendDataStream: New File CID: bafybeichui6s6lu26uragxswp43oekfzc5urfijehd3dwvfq3zlo7fw46e
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:152 GetFileBytesFromIpfs: in...
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    DEBUG   unixfs_file_cid ipfs/unixfs_file_cid.go:159 GetFileBytesFromIpfs: get DAGService Node of the CID.
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    DEBUG   unixfs_file_cid ipfs/unixfs_file_cid.go:166 GetFileBytesFromIpfs: Convert the DAG node to a UnixFS node.
19:25:06.546 GoLog   E  2024-02-02T17:25:06.546Z    DEBUG   unixfs_file_cid ipfs/unixfs_file_cid.go:173 GetFileBytesFromIpfs: Read the file data from UnixFs node.
19:25:06.548 GoLog   E  2024-02-02T17:25:06.547Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:184 GetFileBytesFromIpfs: out.
19:25:06.548 GoLog   E  2024-02-02T17:25:06.547Z    INFO    rendezvouz  networkmodule/networkmodule.go:108  sendDataStream bytes for CID, original length = 3029977 - from ipfs length = 3029977
19:25:06.548 GoLog   E  2024-02-02T17:25:06.547Z    INFO    rendezvouz  networkmodule/networkmodule.go:111  sendDataStream peerID = 12D3KooWKAHtDxcEfNsEhgGVVsoKhhrPVXAfYzCMKxD88dtSqqLP

On the other side the other peer that I send the CID to, cannot find it as expected:

19:19:20.813 GoLog   E  2024-02-02T17:19:20.813Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:82  IPFS BitSwap Service Running...
19:19:45.186 GoLog   E  2024-02-02T17:19:45.185Z    DEBUG   rendezvouz  networkmodule/networkmodule.go:69   got a new stream: 12D3KooWQT-1-7 with protocol: /chat_protocol/2.0.0
19:20:50.112 GoLog   E  2024-02-02T17:20:50.108Z    DEBUG   rendezvouz  networkmodule/networkmodule.go:69   got a new stream: 12D3KooWQT-1-8 with protocol: /file_share/2.0.0
19:20:50.112 GoLog   E  2024-02-02T17:20:50.108Z    INFO    rendezvouz  networkmodule/networkmodule.go:80   readDataStream from peerID = 12D3KooWQT-1-8
19:20:50.113 GoLog   E  2024-02-02T17:20:50.109Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:152 GetFileBytesFromIpfs: in...
19:20:50.113 GoLog   E  2024-02-02T17:20:50.109Z    DEBUG   unixfs_file_cid ipfs/unixfs_file_cid.go:159 GetFileBytesFromIpfs: get DAGService Node of the CID.
19:20:50.113 GoLog   E  2024-02-02T17:20:50.110Z    ERROR   unixfs_file_cid ipfs/unixfs_file_cid.go:162 Error getting node from DAGService:block was not found locally (offline): ipld: could not find bafybeichui6s6lu26uragxswp43oekfzc5urfijehd3dwvfq3zlo7fw46e
19:20:50.113 GoLog   E  2024-02-02T17:20:50.110Z    ERROR   rendezvouz  networkmodule/networkmodule.go:88   readDataStream bytes: p G?=/.??"^V6?(?i?$8?;T??V??? Error: block was not found locally (offline): ipld: could not find bafybeichui6s6lu26uragxswp43oekfzc5urfijehd3dwvfq3zlo7fw46e

However, when I change the BlockService instantiation to pass the bitswap as parameter in startBitSwapServer func: bsrv := blockservice.New(bs, bswap)

My app crashes and I get a runtime error:

20:17:16.946 GoLog   E  2024-02-02T18:17:16.945Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:81  IPFS BitSwap Service Running...
20:17:35.382 GoLog   E  2024-02-02T18:17:35.381Z    DEBUG   rendezvouz  networkmodule/chat.go:166   sending through streams 3029982 Bytes
20:17:35.382 GoLog   E  2024-02-02T18:17:35.381Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:121 AddFileToIPFSNetwork...
20:17:35.382 GoLog   E  2024-02-02T18:19:21.748Z    INFO    unixfs_file_cid ipfs/unixfs_file_cid.go:141 AddFileToIPFSNetwork Adding file to DAG
20:17:35.382 GoLog   E  panic: runtime error: invalid memory address or nil pointer dereference

CODE:

I start the ipfs service from my main module as:

go InitIpfs(ctx, host)

This func sends the file CID in the corresponding peers:

// send byte slice to selected nodes in the network
func sendDataStream(data []byte, peerIDs []peer.ID) {
    var wg sync.WaitGroup
    fileCid, err := myIpfs.AddFileToIPFSNetwork(data)
    if err != nil {
        logger.Error(err)
    }
    logger.Debug("sendDataStream: New File CID: ", fileCid)

    // self TEST
    testBytes, err := myIpfs.GetFileBytesFromIpfs(fileCid.Bytes())
    if err != nil {
        logger.Error("readDataStream bytes length: ", len(data), " Error: ", err)
    }
    logger.Info("sendDataStream bytes for CID, original length = ", len(data), " - from ipfs length = ", len(testBytes))

    for _, peerID := range peerIDs {
        logger.Info("sendDataStream peerID = ", peerID)
        wg.Add(1)
        // defer wg.Done() //ensure call in all exit paths of the goroutine to avoid deadlocks.
        go func(id peer.ID) {
            defer wg.Done()
            s, err := ourHost.NewStream(context.Background(), id, FileShareProtocolID)
            if err != nil {
                networkCallback.OnError(getError(protobuf_classes.ErrorType_ERROR_TYPE_UNPSECIFIED, fmt.Errorf("could not create stream with %s", id)))
                return
            }
            defer s.Close()
            writer := io.Writer(s)
            _, err = writer.Write(fileCid.Bytes())
            if err != nil {
                logger.Error(err)
                return
            }
        }(peerID)
    }

    wg.Wait()
}

here I read the stream on the Receiver peer:

// when a data stream is received (on the receiving end of broadcastDataStream)
func readDataStream(s network.Stream) {
    defer s.Close()
    logger.Info("readDataStream from peerID = ", s.ID())
    buf, err := io.ReadAll(s)
    if err != nil {
        networkCallback.OnError(getError(protobuf_classes.ErrorType_ERROR_TYPE_UNPSECIFIED, err))
        return
    }
    rxBytes, err := myIpfs.GetFileBytesFromIpfs(buf)
    if err != nil {
        logger.Error("readDataStream bytes: ", string(buf), " Error: ", err)
    }
    broadcastChatRoom.Messages <- rxBytes
}

and here is the actual implementation of IPFS based on the example. I am not sure that func AddFileToIPFSNetwork() is correct though:

package networkmodule

import (
    "bytes"
    "context"
    "fmt"
    "io"

    logv2 "github.com/ipfs/go-log/v2"

    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/routing"

    "github.com/multiformats/go-multiaddr"
    "github.com/multiformats/go-multicodec"

    "github.com/ipfs/go-cid"
    "github.com/ipfs/go-datastore"
    dsync "github.com/ipfs/go-datastore/sync"

    "github.com/ipfs/boxo/blockservice"
    blockstore "github.com/ipfs/boxo/blockstore"
    chunker "github.com/ipfs/boxo/chunker"
    "github.com/ipfs/boxo/ipld/merkledag"

    unixfile "github.com/ipfs/boxo/ipld/unixfs/file"
    "github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
    uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"

    ipld "github.com/ipfs/go-ipld-format"

    bitswap "github.com/ipfs/boxo/bitswap"
    bsnet "github.com/ipfs/boxo/bitswap/network"
    "github.com/ipfs/boxo/files"
)

var unixfs_tag string = "unixfs_file_cid"

var ipfsLogger = logv2.Logger(unixfs_tag)
var ctx context.Context
var p2pHost *host.Host
var bitSwap *bitswap.Bitswap
var myDagService ipld.DAGService

func InitIpfs(ct context.Context, host *host.Host) {
    logv2.SetLogLevel(unixfs_tag, "debug")
    ipfsLogger.Info("Initializing IPFS BitSwap Service.")
    ctx = ct
    ctx, cancel := context.WithCancel(ctx)
    defer cancel() // ensure context cancel when function exits.

    fullAddr := getHostAddress(*host)
    ipfsLogger.Info("I am ", fullAddr)

    p2pHost = host
    bs, err := startBitSwapServer(*host)
    if err != nil {
        ipfsLogger.Fatal(err)
    }
    bitSwap = bs
    defer bitSwap.Close() // Ensure Bitswap server is closed when function exits.
    ipfsLogger.Info("IPFS BitSwap Service Running...")
    // Run until canceled.
    <-ctx.Done()
    ipfsLogger.Info("IPFS BitSwap Service Terminated!")
}

// Start Node's BitSwap Data Server
func startBitSwapServer(h host.Host) (*bitswap.Bitswap, error) {
    var router routing.ContentRouting
    ipfsLogger.Info("startBitSwapServer: in...")
    // Step 1: Set up a datastore and blockstore
    ds := dsync.MutexWrap(datastore.NewMapDatastore())
    bs := blockstore.NewBlockstore(ds)
    bs = blockstore.NewIdStore(bs) // handle identity multihashes, these don't require doing any actual lookups

    // Step 2: Start the Bitswap network service
    // TODO: ADD leveraging content routing (DHT, IPNI, delegated routing requests, etc.) as we may not know the peer we are fetching from. (?)
    bitSwapNetwork := bsnet.NewFromIpfsHost(h, router)
    bswap := bitswap.New(ctx, bitSwapNetwork, bs)

    // Step 3: Create a block service and a DAG service TODO: remove offline.Exchange(bs)
    bsrv := blockservice.New(bs, bswap)
    myDagService = merkledag.NewDAGService(bsrv) //<------------- why does it crash with bswap (?)
    // Step 4: Start listening on the Bitswap protocol
    bitSwapNetwork.Start(bswap)
    ipfsLogger.Info("startBitSwapServer: out.")
    return bswap, nil
}

func getHostAddress(h host.Host) string {
    // Build host multiaddress
    hostAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().String()))
    // build a full multiaddress to reach this host by encapsulating both addresses:
    addr := h.Addrs()[0]
    return addr.Encapsulate(hostAddr).String()
}

// creates a UnixFS node from the file bytes and adds it to the DAG.
// Returns the CID of the created UnixFS node.
func AddFileToIPFSNetwork(fileBytes []byte) (cid.Cid, error) {
    ipfsLogger.Info("AddFileToIPFSNetwork...")
    unixfsImportParams := uih.DagBuilderParams{
        Maxlinks:  uih.DefaultLinksPerBlock, // Default max of 174 links per block
        RawLeaves: true,                     // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
        CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
            Codec:    uint64(multicodec.DagPb),
            MhType:   uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
            MhLength: -1,                          // Use the default hash length for the given hash function (in this case 256 bits)
        },
        Dagserv: myDagService,
        NoCopy:  false,
    }
    fileReader := bytes.NewReader(fileBytes)
    ufsBuilder, err := unixfsImportParams.New(chunker.NewSizeSplitter(fileReader, chunker.DefaultBlockSize))
    if err != nil {
        return cid.Undef, err
    }
    ipfsLogger.Info("AddFileToIPFSNetwork Insert file data to DAG...")
    nd, err := balanced.Layout(ufsBuilder)
    if err != nil {
        return cid.Undef, err
    }
    ipfsLogger.Info("AddFileToIPFSNetwork done, cid: ", nd.Cid().String())
    return nd.Cid(), nil
}

// Request from IPFS network and download the file identified by its Content ID
func GetFileBytesFromIpfs(cidBytes []byte) ([]byte, error) {
    ipfsLogger.Info("GetFileBytesFromIpfs: in...")
    c, err := cid.Cast(cidBytes)
    if err != nil {
        ipfsLogger.Error("Error decoding CID:", err)
        return nil, err
    }
    // Get the DAG node for the CID
    ipfsLogger.Debug("GetFileBytesFromIpfs: get DAGService Node of the CID.")
    node, err := myDagService.Get(ctx, c)
    if err != nil {
        ipfsLogger.Error("Error getting node from DAGService:", err)
        return nil, err
    }
    // Convert the DAG node to a UnixFS node
    ipfsLogger.Debug("GetFileBytesFromIpfs: Convert the DAG node to a UnixFS node.")
    unixFSNode, err := unixfile.NewUnixfsFile(ctx, myDagService, node)
    if err != nil {
        ipfsLogger.Error("Error converting DAG node to UnixFS node:", err)
        return nil, err
    }
    // Read the file data from the UnixFS node
    ipfsLogger.Debug("GetFileBytesFromIpfs: Read the file data from UnixFs node.")
    var buf bytes.Buffer
    if file, ok := unixFSNode.(files.File); ok {
        _, err = io.Copy(&buf, file)
        if err != nil {
            ipfsLogger.Error("Error reading file data:", err)
            return nil, err
        }
    } else {
        return nil, fmt.Errorf("expected a file node, got something else")
    }
    ipfsLogger.Info("GetFileBytesFromIpfs: out.")
    return buf.Bytes(), nil
}
welcome[bot] commented 9 months ago

Thank you for submitting your first issue to this repository! A maintainer will be here shortly to triage and review. In the meantime, please double-check that you have provided all the necessary information to make this process easy! Any information that can help save additional round trips is useful! We currently aim to give initial feedback within two business days. If this does not happen, feel free to leave a comment. Please keep an eye on how this issue will be labeled, as labels give an overview of priorities, assignments and additional actions requested by the maintainers:

Finally, remember to use https://discuss.ipfs.io if you just need general support.