openconfig / lemming

An OpenConfig reference device implementation
Apache License 2.0
30 stars 9 forks source link

Subscribe() requires a target while the spec says it shouldn't #263

Closed vincentbernat closed 11 months ago

vincentbernat commented 1 year ago

Hey!

Subscribe() requires a target, while the spec says it should not. In 2.2.2.1:

This field is optional for clients.

Commercial implementations do not require a target. Currently, Subscribe() implementation is delegated to openconfig/gnmi which requires a target. See https://github.com/openconfig/gnmi/issues/151.

vincentbernat commented 8 months ago

This does not seem to solve the issue for me. Looking at the code in #291, I don't see how this could solve that.

I use that:

package gnmi_test

import (
    "context"
    "fmt"
    "net"
    "testing"
    "time"

    "akvorado/common/helpers"

    "github.com/openconfig/gnmic/pkg/api"
    "github.com/openconfig/gnmic/pkg/formatters"
    fgnmi "github.com/openconfig/lemming/gnmi"
    "google.golang.org/grpc"
)

// SpawnGNMIServer returns the connection string to the gNMI server.
func SpawnGNMIServer(t *testing.T) string {
    t.Helper()

    s := grpc.NewServer()
    _, err := fgnmi.New(s, "fake", nil)
    if err != nil {
        t.Fatalf("fgnmi.New() error:\n%+v", err)
    }
    lgnmi, err := net.Listen("tcp", "127.0.0.1:0")
    if err != nil {
        t.Fatalf("Listen() error:\n%+v", err)
    }
    go s.Serve(lgnmi)

    t.Cleanup(func() {
        s.Stop()
    })

    return lgnmi.Addr().String()
}

func TestGnmiServer(t *testing.T) {
    addr := SpawnGNMIServer(t)
    t.Logf("Listen address: %s", addr)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    tg, err := api.NewTarget(
        api.Address(addr),
        api.Timeout(time.Second),
        api.Insecure(true),
    )
    if err != nil {
        t.Fatalf("api.NewTarget() error:\n%+v", err)
    }
    err = tg.CreateGNMIClient(ctx)
    if err != nil {
        t.Fatalf("CreateGNMIClient() error:\n%+v", err)
    }
    defer tg.Close()

    for _, encoding := range []string{"json_ietf", "json"} {
        t.Run(fmt.Sprintf("subscribe once %s", encoding), func(t *testing.T) {
            subscribeReq, err := api.NewSubscribeRequest(
                api.Subscription(api.Path("openconfig:/system/config/hostname")),
                api.Subscription(api.Path("openconfig:/system/state/hostname")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/state/ifindex")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/subinterfaces/subinterface[index=*]/state/ifindex")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/state/name")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/subinterfaces/subinterface[index=*]/state/name")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/state/description")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/subinterfaces/subinterface[index=*]/state/description")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/aggregation/state/lag-speed")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/ethernet/state/negotiated-port-speed")),
                api.Subscription(api.Path("openconfig:/interfaces/interface[name=*]/ethernet/state/port-speed")),
                api.Encoding(encoding),
                api.SubscriptionListMode("once"),
            )
            if err != nil {
                t.Fatalf("NewSubscribeRequest() error:\n%+v", err)
            }
            subscribeResp, err := tg.SubscribeOnce(ctx, subscribeReq)
            if err != nil {
                t.Fatalf("SubscribeOnce() error:\n%+v", err)
            }
            got := map[string]interface{}{}
            for _, resp := range subscribeResp {
                events, err := formatters.ResponseToEventMsgs("hello", resp, nil)
                if err != nil {
                    t.Fatalf("ResponseToEventMsgs() error:\n%+v", err)
                }

                // Merge everything
                for _, ev := range events {
                    for k, v := range ev.Values {
                        got[k] = v
                    }
                }
            }
            expected := map[string]string{
                "/system/config/hostname": "fake",
                "/system/state/hostname":  "fake",
            }
            if diff := helpers.Diff(got, expected); diff != "" {
                t.Errorf("Get() (-got, +want):\n%s", diff)
            }
        })
    }
}

And I get:

    server_test.go:86: SubscribeOnce() error:
        rpc error: code = InvalidArgument desc = request must contain a prefix &gnmi.SubscribeRequest{state:impl.MessageState{NoUnkeyedLiterals:pragma.NoUnkeyedLiterals{}, DoNotCompare:pragma.DoNotCompare{}, DoNotCopy:pragma.DoNotCopy{}, atomicMessageInfo:(*impl.MessageInfo)(0xc000844b88)}, sizeCache:0, unknownFields:[]uint8(nil), Request:(*gnmi.SubscribeRequest_Subscribe)(0xc00649e100), Extension:[]*gnmi_ext.Extension(nil)}

Looking at the PR #291, the handling of Subscribe() is still delegated to github.com/openconfig/gnmi, which still requires a target.

wenovus commented 8 months ago

The fix was via a gRPC interceptor: grpc.StreamInterceptor(fgnmi.NewSubscribeTargetUpdateInterceptor(targetName)), see this file in the PR. Can you try if that works for you?

vincentbernat commented 8 months ago

I am using 0e9423a2a8d0 (github.com/openconfig/lemming v0.3.2-0.20231219194849-0e9423a2a8d0 in go.mod), so it should work. I have updated my example to also have the import lines. We can remove the call to helpers.Diff() since we don't get there (and it wouldn't work as the test does not populate the server with data).

vincentbernat commented 8 months ago

OK, I didn't understand your tips. It works now:

func SpawnGNMIServer(t *testing.T) string {
    t.Helper()

    s := grpc.NewServer(grpc.StreamInterceptor(fgnmi.NewSubscribeTargetUpdateInterceptor("local")))
    _, err := fgnmi.New(s, "fake", nil)
    if err != nil {
        t.Fatalf("fgnmi.New() error:\n%+v", err)
    }
    lgnmi, err := net.Listen("tcp", "127.0.0.1:0")
    if err != nil {
        t.Fatalf("Listen() error:\n%+v", err)
    }
    go s.Serve(lgnmi)

    t.Cleanup(func() {
        s.Stop()
    })

    return lgnmi.Addr().String()
}
wenovus commented 8 months ago

It's a good point that fgnmi itself should handle it, but I'm not seeing a way to register the interceptor after the creation of grpc.Server, I'll add this detail to the fgnmi.New's doc comments.

vincentbernat commented 8 months ago

How should I populate the server? Looking at the tests, I think they do something similar to this:

    setReq, err := api.NewSetRequest(
        api.Target("local"),
        api.Update(
            api.Path("openconfig:/system/config/hostname"),
            api.Value("fake", "json_ietf"),
        ),
        api.Update(
            api.Path("openconfig:/system/state/hostname"),
            api.Value("fake", "json_ietf"),
        ),
    )
    if err != nil {
        t.Fatalf("api.NewSetRequest() error:\n%+v", err)
    }
    _, err = tg.Set(ctx, setReq)
    if err != nil {
        t.Fatalf("Set() error:\n%+v", err)
    }

This code succeeds. However, when subscribing, I get:

ERROR in interceptor stream: rpc error: code = NotFound desc = no such target: "local"
    server_test.go:106: SubscribeOnce() error:
        rpc error: code = NotFound desc = no such target: "local"

For context, my goal is to be able to run unittests for code acting as a gNMI client.

wenovus commented 8 months ago

Can you try changing the interceptor's "local" argument to "fake" as you have when initializing fgnmi.New?

vincentbernat commented 8 months ago

I did the reverse. I now understand your latest comment. I can move forward from there! Thanks!