nats-io / nats-general

General NATS Information
Apache License 2.0
63 stars 14 forks source link

Jetstream Client API General Design #51

Open ColinSullivan1 opened 4 years ago

ColinSullivan1 commented 4 years ago

Design the general Jetstream client API for publishing, acknowledging, etc.

ripienaar commented 4 years ago

For discussion I have this in go, funtion names etc all up for discussion.

This is looking a lot like the entire jsm consumer admin and would be a ton of copy/paste

Message Acknowledgement and Metadata

Basic Acks

err := m.Ack()
err := m.Nak()
err := m.AckProgress()
err := m.AckNext("_INBOX.xxx")
err := m.AckTerm()

Acks with options for reliable acks

Timeout based on time.Duration()

err := m.Ack(nats.AckWaitDuration(5*time.Second))

Timeout based on a context - timeout or deadline context

err := m.Ack(nats.AckWaitContext(ctx))

JS Originating Message Metadata

These will parse the message metadata on demand - first access will parse and cache it - so in the non JS case or when not used will not impact performance

We considered lazy-adding these as headers but with some of them being int and time.Time it was just annoying - users would have to do additional parsing.

func (m *Msg) JSStream() string
func (m *Msg) JSConsumer() string
func (m *Msg) JSDelivered() int
func (m *Msg) JSStreamSequence() int
func (m *Msg) JSConsumerSequence() int
func (m *Msg) JSTimeStamp() time.Time

Publishing

This publishes to a stream, which must already exist, waiting for 2 seconds for that specific stream to respond.

Does not yet support multiple ack respnses

err := nc.Publish("js.str", []byte("hello"), nats.ExpectAckFromStream("STREAM", 2*time.Second))

Requests

Request is unchanged - we don want to intercept the responses I think, but want to help parse them

msg, _ := nc.Request("js.str", []byte("hello"), 2 * time.Second)
ack, _ := nats.ParsePublishAck(msg) // will error with the message if -ERR or not +OK a generic error is return
fmt.Printf("Ack from %s with sequence %d\n", ack.Stream, ack.Sequence)

Subscribes

For subscribe I tried Subscribe(subj, cb, ...opts) in the past where opts created the entire consumer right there and I found it really annoying but probably its ok if people like that.

Basic

This wil ensure that if cfg is durable that it will be updated with ib, if ephemeral it will be made with that ib

sub, err := nc.Subscribe(ib, cb, nats.WithConsumer(cfg))

Template style

Here I have a typical consumer config and do just some slight modes to it

ordersConsumer := nats.NewConsumer(.....) // perhaps with sampling etc, can be passed to nats.WithConsumer() if complete

// later

newOrders, _ := nc.Subscribe(ib, cb, nats.NewConsumerFromDefault(stream, ordersConsumer, nats.FilterStreamBySubject("ORDERS.new"))

dispatchedOrders, _ := nc.Subscribe(ib, cb, nats.NewConsumerFromDefault(stream, ordersConsumer, nats.FilterStreamBySubject("ORDERS.dispatched"))

General Helpers and Admin

These are probably the absolute minimum we need to help people a bit:

sub := nats.NextSubject("ORDERS", "new")
err := nc.NextMessage("ORDERS", "new", 1, "_INBOX.xxx")
os, _ := nc.ConsumerStatus("ORDERS", "new")
err := nc.DeleteConsumer("ORDERS", "new")
consumers, _ := nc.ListConsumers("ORDERS")
err := nc.NewConsumer(stream, cfg)

Pull Helper

I think inevitably we will need a helper for Pull subscribers that maintain a local buffer, does health checks etc, not sure how this will look exactly so this is just spit balling.

// 100 message buffer
newOrders, err := nats.PullSubscribe("ORDERS", "new", 100, nats.WithConsumer(cfg)) // last is optional

// allows graceful close and shutdown
go func() {
  <-ctx.Done()
  newOrders.Close()
}()

for msg := range newOrders.NextMsg() {
  // do stuff

  // also other ack types that makes sense has to be ack here so the subscriber can intelligently
  // handle things using +NXT or a bigger pull if we are falling behind etc, can also be a pull specific
  // msg that has the same methods i guess
  newOrders.Ack(msg) 
}
ColinSullivan1 commented 4 years ago

This is a great writeup, thanks @ripienaar. Throwing out a few ideas...

I propose we make sure the JetStream specific APIs clear to avoid confusion. Documentation would make it clear that they only work when jetstream is enabled.

sub, err := nc.StreamConsumerNextSubject("ORDERS", "new")
err := nc.StreamConsumerNextMessage("ORDERS", "new", 1, "_INBOX.xxx")
sts, err := nc.StreamConsumerStatus("ORDERS", "new")
scl, err := nc.StreamConsumerList("ORDERS")
err := nc.StreamConsumerDelete("ORDERS", "new")

One could consider creating a stream consumer object and placing the APIs there rather than on the connection itself, but this creates separation between requesting a message and receiving one on the connection.

sc, err := nc.GetStreamConsumer("ORDERS", "new")
err := sc.NextMessage(1, "_INBOX.xxx")
sts, err := sc.Status()
err = sc.Delete()

var scl []*StreamConsumer
scl, err := sc.StreamConsumerList("ORDERS")

I'm wondering if we could use the no-responder API in these, or ping for presence of a jetstream enabled server. One of the most frequently encountered issues with NATS streaming was a problem identifying whether a NATS streaming connect call timed out, the wrong cluster ID was used, or streaming wasn't running/enabled.

I'm concerned users will get into trouble with the Request acting as a publish. Users may get confused since it is a request with the JS server vs a request with another application... imagine both in the same app, where one request is a JS publish, and another is to a user provided service.

I suggest something like:

// sync publish
ack, err := nc.PublishAndVerifyStorage("subj", payload, 2 * time.Second) 
// (or PublishWithAck, PublishAndVerifyStream,, PublishWithStreamAck, etc.)
// In the future could add expected # of acks in case a KV store, object store, multiple streams, etc is behind that subject.

(Note that API could work for KV store, ObjectStore, etc).

The async publish could be the Publish APIs you have above, other languages can overload or add additional signatures.

PullSubscribe could be something like StreamConsumerSubscribe

About packaging, I strongly advise these APIs being in the same package as the core NATS APIs because these are tightly coupled with core NATS. By virtue of being the same package, assembly, jar, etc, that association will be made by users and jetstream won't be considered a bolt on/add on as perceived by some users with NATS streaming. Versioning is in line between client functionality and server functionality. We could also someday expect to see JS related flags in the handshake info returned by the server.

ripienaar commented 4 years ago

I like this direction, did you take a look at JSM @ColinSullivan1? much can be pillaged from there.

@derekcollison what do you think of the direction Colin proposes, specifically not extending the existing Publish() etc with JS specific stuff, this is from a recent chat we had and this was the consensus among a bunch of us.

derekcollison commented 4 years ago

I think JetStream Publish is just normal publish waiting on the proper response from JetStream. Lots of ways to do this, not a huge fan of new APIs with super long names though.. ;)

ripienaar commented 4 years ago

From yesterdays call, I was not able to do the wrapper in nats.go I promised as that would trigger go cycling imports.

Here is a test of what such an integration would look like, note JetStreamPublish I did not want to do a generic one since we dont know what KV etc will look like.

I am also adding a JSMsg object and moving all the ack and stuff there, thus msg.JSMsg() will create that from an existing *Msg and from there you can access all the JS message stuff

There's a branch here https://github.com/nats-io/nats.go/compare/master...ripienaar:jsm_wrap that would not compile but would show what I am on about

Other things we said, I'd make the validator a injectable dependency so that jsm.go only gets all the JSON schema stuff and nats.go would not have validation or the dependencies as it would have a null validator. So ignore those here

func TestJetStream(t *testing.T) {
    srv := startJetStream(t)
    defer os.RemoveAll(srv.JetStreamConfig().StoreDir)
    defer srv.Shutdown()

    nc, err := Connect(srv.ClientURL())
    if err != nil {
        t.Fatalf("connect failed: %v", err)
    }

    if !nc.IsJetStreamEnabled() {
        t.Fatalf("JetStream is not enabled")
    }

    str, err := nc.NewStreamFromDefault("ORDERS", DefaultStream, StreamSubjects("ORDERS.*"))
    if err != nil {
        t.Fatalf("stream create failed: %s", err)
    }

    received := 0
    _, err = nc.Subscribe(NewInbox(), func(m *Msg) {
        jsm, err := m.JSMsg()
        if err != nil {
            t.Fatalf("failed to create JSMsg: %s", err)
        }

        err = jsm.Ack(AckWaitDuration(2 * time.Second))
        if err != nil {
            t.Fatalf("Ack was not acknowledged: %s", err)
        }

        received++
    })

    // ephemeral
    _, err = str.NewConsumer(ConsumerFilterStreamBySubject("ORDERS.new"))
    if err != nil {
        t.Fatalf("consumer create failed: %s", err)
    }

    for i := 0; i < 10; i++ {
        // also str.Publish("ORDERS.new", []byte(...), 2*time.Second) will wait 2 seconds and verify str.Name() received it
        state, err := nc.JetStreamPublish("ORDERS.new", []byte(fmt.Sprintf("new order %d", i)), PublishRequiresStream("ORDERS"))
        if err != nil {
            t.Fatalf("publish failed: %s", err)
        }
    }

    si, err := str.State()
    if err != nil {
        t.Fatalf("failed to get stream state: %s", err)
    }

    if si.Msgs != 10 {
        t.Fatalf("expected 10 messages got %d", si.Msgs)
    }

    time.Sleep(10 * time.Millisecond)
    if received != 10 {
        t.Fatalf("expected 10 messages got %d", received)
    }
}