lukso-network / lukso-orchestrator

Orchestrating the dance of vanguard and pandora.
Apache License 2.0
6 stars 7 forks source link

Implement a van subscriber for new pending blocks. #22

Closed atif-konasl closed 3 years ago

atif-konasl commented 3 years ago

Overview

For block verification process, orchestrator needs to have new block header from pandora client as well from vanguard client. So orchestrator client subscribes to trusted vanguard client for new block headers. After receiving notification of new pending new block, it needs to cache and store into db and key will be slot number and value will be blockHash and blockHeader.

Implementation

  1. Implement subscription method. Api definition: van_subscribe: StreamNewPendingBlocks in vanguard client.
  2. When bootstrapping the orchestrator node, it will subscribe to van client in van_subscribe: StreamNewPendingBlocks event api.
  3. If connection does not successful then should be have option to retry the connection as well retry to subscribe.
  4. If connection is successful and getting block from vanguard, store it into lru cache. There will be a cache for that: slot -> block
atif-konasl commented 3 years ago

Client code could be looked like this:

func (c *GRPCClient) pendingBlockSubscription(wg *sync.WaitGroup) {
    stream, err := c.beaconClient.StreamNewPendingBlocks(
        c.ctx,
        &ptypes.Empty{},
        )
    if err != nil {
        log.WithError(err).Fatal("Failed to subscribe to StreamChainHead")
    }

    log.Info("Successfully subscribed to chain header event")
    go func() {
        for {
            vanBlock, err := stream.Recv()
            if err != nil {
                log.WithError(err).Error("Failed to receive chain header")
                return
            }
        }
    }()
}
// GRPCClient
type GRPCClient struct {
    ctx             context.Context
    c               *grpc.ClientConn
    dialOpts        []grpc.DialOption
    beaconClient    ethpb.BeaconChainClient
    validatorClient ethpb.BeaconNodeValidatorClient
}

// Dial connects a client to the given URL.
func Dial(ctx context.Context, rawurl string, grpcRetryDelay time.Duration,
    grpcRetries uint, maxCallRecvMsgSize int) (*GRPCClient, error) {

    dialOpts := constructDialOptions(
        maxCallRecvMsgSize,
        "",
        grpcRetries,
        grpcRetryDelay,
    )
    if dialOpts == nil {
        return nil, nil
    }

    c, err := grpc.DialContext(ctx, rawurl, dialOpts...)
    if err != nil {
        log.Errorf("Could not dial endpoint: %s, %v", rawurl, err)
        return nil, err
    }

    return &GRPCClient{
        ctx,
        c,
        dialOpts,
        ethpb.NewBeaconChainClient(c),
        ethpb.NewBeaconNodeValidatorClient(c),
    }, nil
}

// Close
func (ec *GRPCClient) Close() {
    ec.c.Close()
}

// constructDialOptions constructs a list of grpc dial options
func constructDialOptions(
    maxCallRecvMsgSize int,
    withCert string,
    grpcRetries uint,
    grpcRetryDelay time.Duration,
    extraOpts ...grpc.DialOption,
) []grpc.DialOption {
    var transportSecurity grpc.DialOption
    if withCert != "" {
        creds, err := credentials.NewClientTLSFromFile(withCert, "")
        if err != nil {
            log.Errorf("Could not get valid credentials: %v", err)
            return nil
        }
        transportSecurity = grpc.WithTransportCredentials(creds)
    } else {
        transportSecurity = grpc.WithInsecure()
        log.Warn("You are using an insecure gRPC connection. If you are running your beacon node and " +
            "validator on the same machines, you can ignore this message. If you want to know " +
            "how to enable secure connections, see: https://docs.prylabs.network/docs/prysm-usage/secure-grpc")
    }

    if maxCallRecvMsgSize == 0 {
        maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
    }

    dialOpts := []grpc.DialOption{
        transportSecurity,
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
            grpc_retry.WithMax(grpcRetries),
            grpc_retry.WithBackoff(grpc_retry.BackoffLinear(grpcRetryDelay)),
        ),
    }

    dialOpts = append(dialOpts, extraOpts...)
    return dialOpts
}
blazejkrzak commented 3 years ago

I moved it to done, if it is not correct please restore