ipfs / kubo

An IPFS implementation in Go
https://docs.ipfs.tech/how-to/command-line-quick-start/
Other
16.2k stars 3.03k forks source link

Expose PubSub over gRPC #8602

Open Pandapip1 opened 2 years ago

Pandapip1 commented 2 years ago

Checklist

Description

Currently, the HTTP API for PubSub is less than ideal. The fact that you have to send an HTTP GET request to publish something and listen to a stream to subscribe to something is clunky (and slow!), and is exactly what gRPC is meant to solve! I suggest that a new endpoint be added that exposes PubSub using gRPC.

welcome[bot] commented 2 years ago

Thank you for submitting your first issue to this repository! A maintainer will be here shortly to triage and review. In the meantime, please double-check that you have provided all the necessary information to make this process easy! Any information that can help save additional round trips is useful! We currently aim to give initial feedback within two business days. If this does not happen, feel free to leave a comment. Please keep an eye on how this issue will be labeled, as labels give an overview of priorities, assignments and additional actions requested by the maintainers:

Finally, remember to use https://discuss.ipfs.io if you just need general support.

Pandapip1 commented 2 years ago

Since gRPC has more browser support than it did in 2021, I am changing this to request gRPC instead for parity with js-ipfs.

Jorropo commented 2 years ago

@Pandapip1 gRPC is more complicated than websocket, I don't know if it's a thing we want to take on.

Pandapip1 commented 2 years ago

Rationale:

IPFS is really useful for websites. Particularly, online games are often hard to run because they are fairly easily DDoSed, and it's hard to make a well-designed game server. IPFS PubSub just straight up solves this problem, and if you design it well, reduces latency manyfold as well.

IPFS PubSub has many other uses for websites too. But IPFS for online games would be practically the holy grail for amateur game devs (particularly those that hypothetically didn't yet have access to online payments). No need to use an expensive VPS. Just run a static site.

GregoryVPerry commented 1 year ago

Should have a PR for this within a week.

GregoryVPerry commented 1 year ago

So here is the proposed architecture, please let me know if there are any issues with this proposed approach for the PR @Jorropo

Currently IPFS supports a pubsub implementation via:

ipfs daemon --enable-pubsub-experiment  # https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#ipfs-pubsub

This implementation however is REST-based so there are no options for push currently, where a pubsub subscription could be both initiated and monitored via a streaming method, which in this instance we will be implementing gRPC streaming for the PR.

So the goal of this task is to simply provide an alternative method for both pubsub topic subscription and notification, using gRPC streaming as an alternative method to the current method exposed within https://github.com/ipfs/kubo/blob/master/core/commands/pubsub.go.

First steps would be to fork and then:

make test  # from installed IPFS kubo directory

...to insure that your installation is correct, after pulling the ipfs kubo repo @ https://github.com/ipfs/kubo

From the repo (and per https://github.com/ipfs/kubo#development), main file is:

./cmd/ipfs/main.go

Sample code to add a gRPC server and listener in the main function:

grpcServer := grpc.NewServer()
pubsubGRPCServer := &pubsubGRPCServer{api.PubSubAPI}
RegisterPubSubServiceServer(grpcServer, pubsubGRPCServer)
listener, err := net.Listen("tcp", ":50051")
if err != nil {
    log.Fatalf("failed to listen: %v", err)
}
grpcServer.Serve(listener)

Imports to https://github.com/ipfs/kubo/blob/master/core/commands/pubsub.go:

import (
    "google.golang.org/grpc"
)

A struct for the gRPC service interface:

type pubsubGRPCServer struct {
    api.PubSubAPI
}

Define the gRPC service interface, including the new SubscribeGRPC method:

type PubSubServiceServer interface {
    Subscribe(context.Context, *Topic) (*Subscription, error)
    SubscribeGRPC(stream PubSubService_SubscribeGRPCServer) error
}

Implement the new SubscribeGRPC method in the pubsubGRPCServer struct:

func (s *pubsubGRPCServer) SubscribeGRPC(stream PubSubService_SubscribeGRPCServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        topic := in.Topic

        sub, err := s.PubSub().Subscribe(stream.Context(), topic)
        if err != nil {
            return err
        }
        defer sub.Close()

        for {
            msg, err := sub.Next(stream.Context())
            if err == io.EOF || err == context.Canceled {
                return nil
            } else if err != nil {
                return err
            }

            // send the message to the client
            if err := stream.Send(&Message{Data: msg.Data}); err != nil {
                return err
            }
        }
    }
}

So in this instance we are providing sample golang code that modifies the current IPFS pubsub implementation for a gRPC server and listener, e.g.:

package main

import (
    "context"
    "fmt"
    "net"
    "net/http"

    "google.golang.org/grpc"

    pb "github.com/your/package/path/proto"
)

func main() {
    // ... existing code ...

    // Create a gRPC server
    grpcServer := grpc.NewServer()
    pb.RegisterYourServiceServer(grpcServer, &yourServer{})

    // Create a listener
    listener, err := net.Listen("tcp", ":50051")
    if err != nil {
        fmt.Printf("Failed to listen: %v", err)
    }

    // Register the gRPC server with the http.Server
    httpServer := &http.Server{
        Handler: grpcServer.ServeMux,
    }

    // Start the gRPC server and the http.Server
    go func() {
        if err := grpcServer.Serve(listener); err != nil {
            fmt.Printf("Failed to serve gRPC server: %v", err)
        }
    }()
    if err := httpServer.Serve(httpListener); err != nil {
        fmt.Printf("Failed to serve http server: %v", err)
    }
}

An additional library such as grpc_server.go would look like:

package main

import (
    "fmt"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/reflection"

    pb "path/to/your/protobuf/files"
)

const (
    port = ":50051"
)

type server struct{}

func (s *server) YourRPCMethod(ctx context.Context, in *pb.YourRequestType) (*pb.YourResponseType, error) {
    // your implementation here
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        fmt.Println("Failed to listen:", err)
    }
    s := grpc.NewServer()
    pb.RegisterYourServer(s, &server{})
    // Register reflection service on gRPC server.
    reflection.Register(s)
    if err := s.Serve(lis); err != nil {
        fmt.Println("Failed to serve:", err)
    }
}

The directory structure for the protos should just be a subdir in the current working directory, protos.

In the main() function the gRPC server is started, and with requests being served via gRPC as well as the native pubsub implementation provided by --experiment

https://blog.ipfs.tech/25-pubsub/
https://github.com/ipfs/js-ipfs/blob/master/docs/core-api/PUBSUB.md
https://github.com/ipfs/kubo/issues/8602

The PR will be submitted under 8602.
Pandapip1 commented 1 year ago

LGTM!

Pandapip1 commented 8 months ago

It's been a year. Any updates?