Azure / go-amqp

AMQP 1.0 client library for Go.
https://github.com/Azure/go-amqp
MIT License
106 stars 57 forks source link

Creating receiver with settle mode second fails when connected to Artemis #97

Closed larssqills closed 2 years ago

larssqills commented 2 years ago

Hi,

I am trying to create a new client which connects to Artemis using AMQP 1.0.

I want the client to explicitly call the 'AcceptMessage' before Artemis removes the message as documented when creating the receiver with settle mode 'second'.

docker-compose.yml

version: '3'

services:
  artemis:
    image: vromero/activemq-artemis
    environment:
      - ARTEMIS_USERNAME=artemis
      - ARTEMIS_PASSWORD=artemis
      - DISABLE_SECURITY=true
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    restart: always

main.go

package main

import (
    "context"
    "github.com/Azure/go-amqp"
    "github.com/sirupsen/logrus"
)

func main() {
    addr := "127.0.0.1"

    client, err := newClient(addr)
    if err != nil {
        logrus.Fatalf("failed to create client - %s", err.Error())
    }

    session, err := client.NewSession()
    if err != nil {
        logrus.Fatalf("failed to create session - %s", err.Error())
    }

    name := "my.topic"

    receiver, err := session.NewReceiver(
        amqp.LinkName("my-queue"),
        amqp.LinkSourceAddress(name),
        amqp.LinkCredit(1),
        amqp.LinkReceiverSettle(amqp.ModeSecond),
        amqp.LinkSourceCapabilities("shared", "global", "topic"),
        amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
        amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
    )
    if err != nil {
        logrus.Fatalf("failed to create receiver - %s", err.Error())
    }

    logrus.Infof("now listening to events from %s", name)

    for {
        msg, err := receiver.Receive(context.Background())
        if err != nil {
            logrus.Fatalf("reading message from %s artemis failed", name)
        }

        logrus.Infof("received message from artemis on topic %s with payload %s", addr, msg.Value)

        // Implement handle & accept/reject message here
    }
}

func newClient(addr string) (*amqp.Client, error) {
    return amqp.Dial(addr, amqp.ConnContainerID("testing-for-github"))
}

This code results in the following output;

FATA[0000] failed to create receiver - amqp: receiver settlement mode "second" requested, received "first" from server 

It seems this is because of a new implementation feature that checks the settle mode received from the server located at "go-amqp@v0.16.4/link.go:291".

I am not sure if this check is really required and why it needs to be checked?

To check if this is the only thing that blocks my purpose, I tried creating a fork that only has the check removed from the library. This resulted in the following output:

INFO[0000] now listening to events from my.topic        
INFO[0015] received message from artemis on topic 127.0.0.1 with payload test 

The message is correctly received and processed, but it is also directly confirmed, even when I have the receiver's settle mode set to 'second'.

Could you see what I am doing wrong here?

jhendrixMSFT commented 2 years ago

In your sample, you explicitly requested that you want ModeSecond semantics when receiving messages.

amqp.LinkReceiverSettle(amqp.ModeSecond)

The peer (Artemis) is responding that it only supports ModeFirst, and since it can't fulfil the mode that was requested, we return an error. This is important so that you know you're not getting ModeSecond behavior.

Changing to amqp.ModeFirst, or omitting the call to amqp.LinkReceiverSettle() entirely will resolve the issue (assuming you're ok with lack of ModeSecond support).

larssqills commented 2 years ago

Hi jhendrixMSFT,

Thanks for you reaction.

I want to create a "shared durable consumer" in Java terms. This is possible when using the Java client and connecting to Artemis. Assuming the amqp.ModeSecond is needed to make this happen, I do want to use it. Or is there another solution to create a queue, for which multiple clients can connect too (shared) and have durable messages (no auto acknowledge) which require an explicit "acknowledge" of the message?

jhendrixMSFT commented 2 years ago

Which Java client are you using?

jhendrixMSFT commented 2 years ago

Looking at the docs for Artemis, it looks like it can be configured for ModeSecond behavior which is off by default. See the section "Non Transactional Acknowledgements" here. Does this help?

larssqills commented 2 years ago

I am using "org.apache.qpid:qpid-jms-client:0.57.0" as the Java library.

Here it is possible to create a session with a specific "acknowledgement mode" which is set to "CLIENT_ACKNOWLEDGEMENT". So with Artemis I am able to create a shared durable consumer.

Maybe I am missing something, but how would the Artemis documentation help me out here?

jhendrixMSFT commented 2 years ago

I was looking at the below part of their docs. The way I read this, and I could be wrong, is that ModeSecond is disabled by default.

Non Transactional Acknowledgements

If you are acknowledging the delivery of a message at the client side using a non transacted session, Apache ActiveMQ Artemis can be configured to block the call to acknowledge until the acknowledge has definitely reached the server, and a response has been sent back to the client. This is configured with the parameter BlockOnAcknowledge. If this is set to true then all calls to acknowledge on non transacted sessions will block until the acknowledge has reached the server, and a response has been sent back. You might want to set this to true if you want to implement a strict at most once delivery policy. The default value is false

larssqills commented 2 years ago

I have found a solution to setup a receiver which is able to acknowledge the events itself.

amqp.LinkCredit(2),                   // Set to 2 as the client will process 1 event per time so that won't get acknowledged
amqp.LinkBatching(true),              // Enable batching to let the client acknowledge message itself, as it won't reach the credit of 2.
amqp.LinkBatchMaxAge(30*time.Second), // 30 second timeout, or whatever you want to set it too, if the limit is reached, the message will still be acknowledge.

Setting up the receiver with the options above, the client is able to acknowledge the events itself when the event is correctly processed.

This only leaves the unhappy flow, the message will still be dropped/confirmed because the batching is enabled and will sent the disposition (confirm) to the server. This was solved by resending the same payload to the server so a client can pick up the message.

It's not the best solution imo, but it works for the use case I have.

Thanks for thinking along jhendrixMSFT!