nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

Context aware subscription #310

Closed meetme2meat closed 4 years ago

meetme2meat commented 4 years ago

Can we have a context-aware nats(streaming) subscription. This would help us cleanly shutdown our services(during scaled-down window) those are subscribing over a subject.

I see the core nats has few API that allow the message to read with context.

I want to help about how can we use it those API in NATS. So as a starter can anyone share light as what would the right step to implement this with existing Nats context base subscription API.

wallyqs commented 4 years ago

The NATS core client has support for context based cancellation of Requests and sync Subscriptions which can block but the streaming(STAN) client only has async subscriptions so there is no blocking like with NATS calls. Maybe something that could be done is to wrap unsubscribe calls within the cancellation function so that cancel, removes interest along with the cancellation of the context? Something like:

package main

import (
    "context"
    "log"
    "time"

    "github.com/nats-io/stan.go"
)

func main() {
    sc, err := stan.Connect("test-cluster", "client-123")
    if err != nil {
        log.Fatal(err)
    }

    ctx, done := context.WithCancel(context.Background())
    sub1, err := sc.Subscribe("foo", func(m *stan.Msg) {
        log.Println("Received", string(m.Data))
    })

    sub2, err := sc.Subscribe("bar", func(m *stan.Msg) {
        log.Println("Received", string(m.Data))
    })
    cancel := func() {
        log.Println("Unsubscribing...")
        sub1.Unsubscribe()
        sub2.Unsubscribe()

        // Or teardown all subscriptions close connection.
        // sc.Close()
        done()
    }
    sc.Publish("foo", []byte("one"))
    sc.Publish("bar", []byte("two"))

    select {
    case <-time.After(2 * time.Second):
        cancel()
    }

    select {
    case <-ctx.Done():
        log.Println("Exiting...")
    }
}
meetme2meat commented 4 years ago

@wallyqs Sorry for the delayed response. That would do.

Thanks.