liftbridge-io / go-liftbridge

Go client for Liftbridge. https://github.com/liftbridge-io/liftbridge
Apache License 2.0
67 stars 18 forks source link

Need help with nats and liftbridge pub-sub #35

Closed asimpatnaik closed 4 years ago

asimpatnaik commented 4 years ago

Hi, We have the nats server running under certain credentials. We publish data directly to nats on a certain subject. We subscribe using go-liftbridge. It was working till the time we were using "github.com/liftbridge-io/liftbridge-grpc/go". With the migration to "github.com/liftbridge-io/liftbridge-api/go", subscription has stopped working. I noted that subscription using nats client works but not with liftbridge. I don't see a way to pass the credentials while connecting using go-liftbridge. Please let me know if I'm missing anything here.

Regards, Asim.

tylertreat commented 4 years ago

The Liftbridge client does not connect to NATS, only Liftbridge's gRPC API. Can you explain what you mean by "stopped working"? Do you see errors?

asimpatnaik commented 4 years ago

to

Thanks for your response. Below is the info that I have:

nats-server version 2.0.0-alpha nats was run with the command - gnatsd -m 6969 --user "userid" --pass "pwd" liftbridge.conf contents: host: <x.x.x.x>

Define NATS cluster to connect to.

nats { servers: ["nats://userid:pwd@0.0.0.0:4222"] }

liftbridge was run with the command - liftbridge --raft-bootstrap-seed --config liftbridge.conf

Subscribe go code snippet: func subscribe() { // Create Liftbridge client. addrs := []string{"x.x.x.x:9292"} client, err := lift.Connect(addrs) // The control doesn't move beyond this point if err != nil { panic(err) } defer client.Close()

var (
    subject = "mysub"
    name    = "mystream"
)
if err := client.CreateStream(context.Background(), subject, name); err != nil {
    if err != lift.ErrStreamExists {
        panic(err)
    }
}

ctx := context.Background()
if err := client.Subscribe(ctx, name, func(msg *proto.Message, err error) {
    if err != nil {
        panic(err)
    }
    fmt.Println(time.Unix(0, msg.Timestamp), msg.Offset, string(msg.Key), string(msg.Value))
}); err != nil {
    panic(err)
}

<-ctx.Done()

}

Publish go code snippet: func publish(){ NC, err := nats.Connect("x.x.x.x:4222", nats.UserInfo("userid", "pwd")) if err != nil { Logger.Error(err.Error()) Logger.Panic("Could not connect to NATS server", zap.Error(err)) }

    err = NC.Publish("mysub", somemsg)
if err != nil {
    Logger.Error("NATS options sending failed too")
    Logger.Error(err.Error())
}

NC.Close()

}

The below line from subscribe func is where the control stops: client, err := lift.Connect(addrs)

I debugged and could make out that the control stops at the below point in the file transport.go : func (s *Stream) waitOnHeader() error { if s.headerChan == nil { // On the server headerChan is always nil since a stream originates // only after having received headers. return nil } select { // This is where it stops case <-s.ctx.Done(): return ContextErr(s.ctx.Err()) case <-s.headerChan: return nil } }

I have tried my best to explain the issue. Kindly help. Also let me know how I can see if there is any error and how can I see the published messages in the console.

asimpatnaik commented 4 years ago

Kindly let me know if the provided info helped you nail the issue. I'm yet to figure out what I have missed.

tylertreat commented 4 years ago

I haven't been able to reproduce this. Are you able to telnet to the Liftbridge server?

$ telnet x.x.x.x 9292
asimpatnaik commented 4 years ago

I am able to telnet. I tried running the below code on the same server where nats and liftbridge are running: package main

import ( "fmt" "github.com/liftbridge-io/go-liftbridge" )

func main() { _, err := liftbridge.Connect([]string{"x.x.x.x:9292"}) if err != nil { fmt.Println(err) } else { fmt.Println("success") } }

It just got stuck and never printed any thing on the console.

On Sat, Jan 18, 2020 at 1:05 AM Tyler Treat notifications@github.com wrote:

Are you able to telnet to the Liftbridge server?

$ telnet x.x.x.x 9292

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/liftbridge-io/go-liftbridge/issues/35?email_source=notifications&email_token=AIFFBUAAZIGV6773DLAKNN3Q6IB6NA5CNFSM4KGPAO72YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJIXVFQ#issuecomment-575765142, or unsubscribe https://github.com/notifications/unsubscribe-auth/AIFFBUAWQVL6HJJDBB7NE63Q6IB6NANCNFSM4KGPAO7Q .

asimpatnaik commented 4 years ago

Finally I got it running. Looks to be some version related issue. I took the latest of liftbridge and all seem to be working fine.

tylertreat commented 4 years ago

Glad you got it working!

On Mon, Jan 20, 2020, 2:51 AM asimpatnaik notifications@github.com wrote:

Finally I got it running. Looks to be some version related issue. I took the latest of liftbridge and all seem to be working fine.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/liftbridge-io/go-liftbridge/issues/35?email_source=notifications&email_token=AAEG64NQDQUCWSHRR2U4RSLQ6VXZJA5CNFSM4KGPAO72YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEJMAOWA#issuecomment-576194392, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAEG64JFPZKDNMI5HCCVYN3Q6VXZJANCNFSM4KGPAO7Q .