rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
170 stars 20 forks source link

Check version when serverProperties is not empty and < 3.11.0 #345

Closed Gsantomaggio closed 3 months ago

Gsantomaggio commented 4 months ago

Fixes: https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/344

When the serverProperties["version"] is not empty the client now checks the version. If the version is >=3.11 it is possible to enable the exchangeVersion function.

Fixes another small bug when the single active consumer is enabled

cc @hiimjako

Gsantomaggio commented 4 months ago

@LucasZhanye can you please try this PR?

LucasZhanye commented 4 months ago

@LucasZhanye can you please try this PR?

Yes, I can

I try it later, and I will reply it after I finished

LucasZhanye commented 4 months ago

@Gsantomaggio I try it, it is work well! the output log is : 2024/07/30 10:49:39 [info] - Server version is less than 3.11.0, skipping command version exchange.

And I want to know how long it will release a new version for it ?

LucasZhanye commented 4 months ago

@LucasZhanye can you please try this PR?

oh, I find another problem.

Now I can DeclareStream,NewProducer,but producer.Send is not take effect.

my code is :

const (
    EVENTSTREAM = "events"
)

type Event struct {
    Name string
}

func main() {
    // COnnect to the Stream Plugin on Rabbimq
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost("10.2.3.200").
            SetPort(15552).
            SetUser("guest").
            SetPassword("guest"))
    if err != nil {
        panic(err)
    }
    // Declare the stream, Set segmentsize and Maxbytes on stream
    err = env.DeclareStream(EVENTSTREAM, stream.NewStreamOptions().
        SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
        SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

    if err != nil {
        panic(err)
    }

    // Create a new Producer
    producerOptions := stream.NewProducerOptions()
    producerOptions.SetProducerName("producer")

    // Batch 100 Events in the same Frame, and the SDK will handle everything
    // DEDPULICATION DOES NOT WORK WITH SUBENTRY
    producerOptions.SetSubEntrySize(100)
    producerOptions.SetCompression(stream.Compression{}.Gzip())

    producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
    if err != nil {
        panic(err)
    }

    // Publish 6001 messages
    for i := 0; i <= 6001; i++ {
        // fmt.Println("i = ", i)
        event := Event{
            Name: "test",
        }

        data, err := json.Marshal(event)
        if err != nil {
            panic(err)
        }

        message := amqp.NewMessage(data)
        // Apply properties to our message
        props := &amqp.MessageProperties{
            CorrelationID: uuid.NewString(),
        }
        message.Properties = props
        // Set Publishing ID to prevent Deduplication
        message.SetPublishingId(int64(i))
        // Sending the message
        if err := producer.Send(message); err != nil {
            panic(err)
        }
    }

    producer.Close()
}

But the rabbitmq management show as below: image

the queue message is zero,but I send 6002

Gsantomaggio commented 3 months ago

@LucasZhanye, I cannot reproduce the issue.

Screenshot 2024-07-30 at 09 04 02

You should check the server logs and also try to remove these values:

SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

also you should always enable the confirmation and wait before close the producer since the send is async

    producerOptions := stream.NewProducerOptions()
    producerOptions.SetProducerName("producer")

    // Batch 100 Events in the same Frame, and the SDK will handle everything
    // DEDPULICATION DOES NOT WORK WITH SUBENTRY
    producerOptions.SetSubEntrySize(100)
    producerOptions.SetCompression(stream.Compression{}.Gzip())

    producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
    if err != nil {
        panic(err)
    }
    chPublishConfirm := producer.NotifyPublishConfirmation()
    totalConfirmed := int32(0)
    chFinished := make(chan struct{})
    go func() {
        for confirmed := range chPublishConfirm {
            for _, msg := range confirmed {
                if msg.IsConfirmed() {
                    atomic.AddInt32(&totalConfirmed, 1)
                    if atomic.LoadInt32(&totalConfirmed) == 6001 {
                        chFinished <- struct{}{}
                    }
                } else {
                    fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
                }

            }
        }
    }()

    // Publish 6001 messages
    for i := 0; i <= 6001; i++ {
        // fmt.Println("i = ", i)
        event := Event{
            Name: "test",
        }

        data, err := json.Marshal(event)
        if err != nil {
            panic(err)
        }

        message := amqp.NewMessage(data)
        // Apply properties to our message
        props := &amqp.MessageProperties{
            CorrelationID: uuid.NewString(),
        }
        message.Properties = props
        // Set Publishing ID to prevent Deduplication
        message.SetPublishingId(int64(i))
        // Sending the message
        if err := producer.Send(message); err != nil {
            panic(err)
        }
    }

    <-chFinished

    producer.Close()
    env.Close()
    fmt.Printf("Closed with total Confirmed: %d\n", totalConfirmed)
LucasZhanye commented 3 months ago

@LucasZhanye, I cannot reproduce the issue.

Screenshot 2024-07-30 at 09 04 02

You should check the server logs and also try to remove these values:

SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

also you should always enable the confirmation and wait before close the producer since the send is async

  producerOptions := stream.NewProducerOptions()
  producerOptions.SetProducerName("producer")

  // Batch 100 Events in the same Frame, and the SDK will handle everything
  // DEDPULICATION DOES NOT WORK WITH SUBENTRY
  producerOptions.SetSubEntrySize(100)
  producerOptions.SetCompression(stream.Compression{}.Gzip())

  producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
  if err != nil {
      panic(err)
  }
  chPublishConfirm := producer.NotifyPublishConfirmation()
  totalConfirmed := int32(0)
  chFinished := make(chan struct{})
  go func() {
      for confirmed := range chPublishConfirm {
          for _, msg := range confirmed {
              if msg.IsConfirmed() {
                  atomic.AddInt32(&totalConfirmed, 1)
                  if atomic.LoadInt32(&totalConfirmed) == 6001 {
                      chFinished <- struct{}{}
                  }
              } else {
                  fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
              }

          }
      }
  }()

  // Publish 6001 messages
  for i := 0; i <= 6001; i++ {
      // fmt.Println("i = ", i)
      event := Event{
          Name: "test",
      }

      data, err := json.Marshal(event)
      if err != nil {
          panic(err)
      }

      message := amqp.NewMessage(data)
      // Apply properties to our message
      props := &amqp.MessageProperties{
          CorrelationID: uuid.NewString(),
      }
      message.Properties = props
      // Set Publishing ID to prevent Deduplication
      message.SetPublishingId(int64(i))
      // Sending the message
      if err := producer.Send(message); err != nil {
          panic(err)
      }
  }

  <-chFinished

  producer.Close()
  env.Close()
  fmt.Printf("Closed with total Confirmed: %d\n", totalConfirmed)

Thanks so much!

Now I run main.go on local system, but the rabbitmq server is on remote, I have a problem when NewProducer, the log is: "panic: Authentication Failure"

But I run main.go on the remote system which the rabbitmq server on , it runs ok.

LucasZhanye commented 3 months ago

The debug log is:

2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36958->10.2.3.200:5552: use of closed network connection
2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36960->10.2.3.200:5552: use of closed network connection
2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: EOF
2024/07/30 15:48:10 [debug] - error removing heartbeat: Response #{id} not found
2024/07/30 15:48:10 [debug] - User:root, Authentication Failure
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36964->10.2.3.200:5552: use of closed network connection
Gsantomaggio commented 3 months ago

2024/07/30 15:48:10 [debug] - User:root, Authentication Failure

I think it is clear where is the problem here

hiimjako commented 3 months ago

In my opinion, the problem of connecting to pre-3.11 servers is solved, as confirmed above. If there are no other technical problems, I think it is mergeable

LucasZhanye commented 3 months ago

Authentication Failure

Hi @Gsantomaggio Maybe I know why it is the queue is zero.

Because I have two rabbitmq server run by docker container, the first is listen on 5552, and the second is listen on 15552

When I exec main.go which is connect to the port 15552, it will send msg to the rabbitmq server which is listen on 5552.

From the log, at last it use 5552:

2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35938->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - **User root, connected to: 10.2.3.200:15552**, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35940->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
coordinatorKey =  **localhost:5552**
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: localhost:5552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35942->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [debug] - waitForInflightMessages, channel: 2374 - pending messages len: 28 - unconfirmed len: 2502 - retry: 0
2024/07/30 16:22:50 [debug] - **Read connection failed: read tcp 127.0.0.1:45690->127.0.0.1:5552**: use of closed network connection
Closed with total Confirmed: 6001

Finally, I see the rabbitmq management which is listen on 5552 , the queue message is increased. image

Gsantomaggio commented 3 months ago

@hiimjako @LucasZhanye merged.

LucasZhanye commented 3 months ago

Authentication Failure

Hi @Gsantomaggio Maybe I know why it is the queue is zero.

Because I have two rabbitmq server run by docker container, the first is listen on 5552, and the second is listen on 15552

When I exec main.go which is connect to the port 15552, it will send msg to the rabbitmq server which is listen on 5552.

From the log, at last it use 5552:

2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35938->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - **User root, connected to: 10.2.3.200:15552**, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35940->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
coordinatorKey =  **localhost:5552**
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: localhost:5552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35942->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [debug] - waitForInflightMessages, channel: 2374 - pending messages len: 28 - unconfirmed len: 2502 - retry: 0
2024/07/30 16:22:50 [debug] - **Read connection failed: read tcp 127.0.0.1:45690->127.0.0.1:5552**: use of closed network connection
Closed with total Confirmed: 6001

Finally, I see the rabbitmq management which is listen on 5552 , the queue message is increased. image

I find the solution on #315

I change my code to below:

addressResolver := stream.AddressResolver{
        Host: "10.2.3.200",
        Port: 15552,
    }

    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().
            SetHost(addressResolver.Host).
            SetPort(addressResolver.Port).
            SetAddressResolver(addressResolver).
            SetUser("root").SetPassword("root").SetVHost("test").
            SetMaxProducersPerClient(5))

It works ok!

Gsantomaggio commented 3 months ago

@LucasZhanye FYI: https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/tag/v1.4.8