nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

Non-idiomatic use of interfaces #233

Open JensRantil opened 5 years ago

JensRantil commented 5 years ago

I am reading the godoc and notice that the library defines the interfaces Conn and Subscription. The idea with interfaces in Go is that they generally should be defined by the user of a library, not the library author. See this and this answers from one of the most influential Go developers out there. The current approach of defining interfaces, and having Connect(...) return a Conn.

On a practical level, if https://godoc.org/github.com/nats-io/go-nats-streaming#Connect had returned an actual (public) implementation I would be able to click on Conn.Publish(subject string, data []byte) error and end up in the actual implementation. This would allow me to much quicker understand how something works.

Given the above, I propose removing the Conn interface and instead renaming the conn implementation to Conn. The same applies to Subscription. I understand that it's a small non-backwards compatible change, but will 1) remove one unnecessary level of abstraction and 2) people relying on the interface can easily define it themselves.

kozlovic commented 5 years ago

@JensRantil We actually did not define interface in core nats and thought that it was a mistake and did it in stan, but yes, not sure if it was the right thing to do in hindsight. Not sure what @derekcollison has to say about this.

To prove the point made above that user should be defining the interface, it is true that even with core NATS (where we don't define an interface), the user could do it and "override" any method. Here is an example:

type Publisher interface {
    Publish(subj string, data []byte) error
}

type myConn struct {
    Publisher
}

func (c *myConn) Publish(subj string, data []byte) error {
    fmt.Printf("@@IK: publish on %q: %q\n", subj, data)
    return c.Publisher.Publish(subj, data)
}

func TestIvan(t *testing.T) {
    s := RunServerOnPort(TEST_PORT)
    defer s.Shutdown()

    nc, err := Connect(fmt.Sprintf("localhost:%d", TEST_PORT))
    if err != nil {
        t.Fatalf("Error connecting: %v", err)
    }
    defer nc.Close()

    sub, err := nc.SubscribeSync("foo")
    if err != nil {
        t.Fatalf("Error on subscribe: %v", err)
    }

    mc := &myConn{Publisher: nc}
    if err := mc.Publish("foo", []byte("hello")); err != nil {
        t.Fatalf("Error on publish: %v", err)
    }

    m, err := sub.NextMsg(time.Second)
    if err != nil {
        t.Fatalf("Error on nextMsg: %v", err)
    }
    fmt.Printf("@@IK: m=%q\n", m.Data)
}

This would produce the output:

=== RUN   TestIvan
@@IK: publish on "foo": "hello"
@@IK: m="hello"
--- PASS: TestIvan (0.04s)
PASS

There was no need for the library to define an interface for the user to be able to mock or override any method of his/her choice.

alexrudd commented 5 years ago

I'm just looking into using NATS atm and this was one of the first things I noticed. This more generally maps to the Go principle of: “Accept interfaces, return structs”.

derekcollison commented 5 years ago

This probably has some merit. I started NATS with the Go language when it was 0.52 and so some of the early patterns have carried over that of starting today with Go 1.12 or 1.13(tip) would do differently.

I am a big believer that we always need to maintain a level of backward compatibility, but there may be some opportunities for us to improve for sure.

Thanks for the feedback, and we will review internally and see what opportunities exist.

alexrudd commented 5 years ago

An approach I've seen work in similar projects is to keep the master branch backward compatible and develop the new package in a v2 branch with a v2 module name suffix. This keeps things safe for older versions of go get whereas those using go modules can pull the v2 package.

Once modules are more widely adopted then it becomes easy for people to pin to the version they want and the v2 branch can be merged into master.

thomasf commented 5 years ago

Yeah, this package probably needs an overhaul.

I was going to write another issue for this but after having read the code a bit more it seems it needs a whole new API.

The sc.Subscribe(...) method also lacks error return. This just goes off into the background and there is no Error() or Wait() methods defined to get or discover if the subscription fails. The best you can do is create a small monitor goroutine which checks .IsValid() periodically but then there is no way to get an actual error out of it, just a boolean state so you don't know if it was because of .Close() or some failure.

    sub, err := sc.Subscribe("mysubject", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s\n", string(m.Data))
    }, stan.DurableName(fs.DurableName))

I guess the more idiomatic implementation would maybe could work something like this:

    // does not start the subscription 
    sub, err := sc.Subscribe("mysubject", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s\n", string(m.Data))
    }, stan.DurableName(fs.DurableName))

    err := sub.Start(ctx) // actually starts the subscription and blocks until it ends for whatever reason.
    if err != nil {
        // check if the error is a connection closed or other error type and handle it accordingly
    }

I guess the underlying nats package can maybe provide some error handler hooks which can make this happen as well but a more tidy and idiomatic interface for this package would be better.

kozlovic commented 5 years ago

@thomasf Not quite. sc.Subscribe() does return an error if the server is rejecting the subscription (for instance because the max number of subscriptions reached on that channel, or durable already registered, etc..). If no error is returned, then either the user is closing the sub or connection and there should be a way for your application to know/synchronize some state, otherwise, if the connection to to the server is lost (the only reason why a subscription would stop after a success return from sc.Subscribe()) you would be notified if you register the appropriate callback with SetConnectionLostHandler().

thomasf commented 5 years ago

I'm just saying that it's easier for me if the function does to go off into a go routine unless I ask it to. I'm ok with it blocking completely until there is an error to process. Btw sub.IsValid() returns true even if the nats connection is closed, I don't know if that is intended behaviour or not.

I wrote a small wrapper package which currently just is internal in an application I am writing to have an easier context aware interaction with stan.go. It works something like this:


    // this context controls the whole stan/nats connection life cycle and
    // cancels all subscriptions when cancel is called or the connection
    // closes. Uses SetConnectionLostHandler among other things.
    ctx, cancel := context.WithCancel(context.Background())

    sc, err := stanctx.Connect(ctx, "test-cluster", "client-123", stan.NatsURL(flags.Nats.NatsURL))
    if err != nil {
        log.Fatal(err)
    }

    sub, err := sc.Subscribe("foobar",
        stan.DurableName(flags.Nats.DurableName),
        stan.AckWait(flags.Nats.AckWait),
        stan.MaxInflight(flags.Nats.MaxInFlight),
        stan.SetManualAckMode())
    if err != nil {
        log.Fatal(err)
    }
    chanCtx, chanCancel := context.WithCancel(ctx)
    errCh := make(chan error, 1)
    go func() {
        // start receiving from a channel (this actually call the original stan
        // Subscribe function) blocks until nats connection is lost, sub is
        // closed, has an error or is stopped, this context can be used to
        // cancle this individual subscription.
        errCh <- sub.StartChan(chanCtx, ch)
                // there is also an 
                // .StartFunc(ctx context.Context, cb stan.MsgHandler) error 
                // but I rather default to dealing with channels
    }()

    go func() {
        // toy example closing the channels context after a minute here
        time.Sleep(time.Minute)
        chanCancel()
    }()

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    select {
    case <-interrupt:
        cancel()
    case err := <-errCh:
        log.Println(err)

    }
kozlovic commented 5 years ago

I have added documentation recently about those subscription functions that are not related to the streaming subscription, but low level subscription. Here is an excerpt from sub.go:

// These functions have been added for expert-users that need to get details
// about the low level NATS Subscription used internally to receive messages
// for this streaming subscription. They are documented in the Go client
// library: https://godoc.org/github.com/nats-io/nats.go#Subscription.ClearMaxPending

Please do not use IsValid() as an indication if the streaming subscription is valid or not. As I said, a subscription is valid until you close it or the connection is declared lost using the SetConnectionLostHandler() option.