smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
237 stars 177 forks source link

Claim-based security #882

Open Rupert-RR opened 3 years ago

Rupert-RR commented 3 years ago

I apologise in advance for any inaccuracies since I am not an expert either in Smallrye Reactive Messaging nor in AMQP.

For context, I am trying to add AMQP messaging (sending and receiving) functionality to my Quarkus application. There is a development environment which may run locally and another environment which is deployed to Microsoft Azure. The idea therefore is to have an AMQP 1.0 service running locally (such as Artemis) and use the Azure Service Bus in the other environment. Since both services use AMQP 1.0 this should be nicely interchangeable with configuration for the two environments.

However, it seems that Azure Service Bus requires claim-based authorisation rather than the simple username/password authorisation used for other services. This seems to be a draft standard supported by Microsoft but I am not sure to what extent it is supported by other libraries/services.

As I understand it, following this description, we:

  1. make an anonymous connection to the service bus
  2. set up an incoming link on the special $cbs queue
  3. set up an outgoing link on the special $cbs queue
  4. craft a special message (or several, for connecting to more than one queue) containing details of the queue to which we wish to connect, signed by a key available from the management console of the Service Bus
  5. send this message on the outgoing link
  6. receive a reply on the incoming link
  7. verify the status of the authorisation by examining the reply for a status code
  8. if the authorisation was successful, we can then, using the same connection, setup all authorised links to all queues authorised by the tokens that we have sent
  9. repeat from step 4 before the expiry of the previous tokens, in order to renew them each time

As far as I can see, this is not an authorisation mechanism which is supported by Smallrye (if it is indeed supported then if you can point me to some documentation my question stops here). Obviously it would be very useful from my point of view if it were supported (and I imagine that I will not be the only one to use the Azure Service Bus in the long term). However, I imagine that fit may not be easy to fit into the existing model (does Smallrye share the connection to the same container between all the different channels?).

I can currently successfully connect to the Service Bus, authorise and send/receive messages by directly using the Vertx AMQP client (and the above steps),but have not been able to even authorise with the Service Bus by configuring the incoming and outgoing links to the $cbs queue and sending the appropriate message via the injected Emitter corresponding to the outgoing channel. Is this simply because I cannot find the correct configuration or is this made impossible by the way Smallrye functions?

Are there any plans to support claim-based security?

Thanks in advance.

cescoffier commented 3 years ago

If the vertx client support it, we should be able to support it too.

Do you have the vertx code somewhere?

@gemmellr WDYT?

Rupert-RR commented 3 years ago

I have some very rough code by way of example (this was obviously just to prove that it works so is in no way production ready). It is in Kotlin, but should still be readable I would think.

val options: AmqpClientOptions = AmqpClientOptions()
    .setHost("sb-name.servicebus.windows.net")
    .setPort(5671)
    .setSsl(true)
    .setLogActivity(true) as AmqpClientOptions
val client1: AmqpClient = AmqpClient.create(options)
client1.connect({ ar ->
    if (ar.failed()) {
        println("Unable to connect to the broker")
    } else {
        println("Connection succeeded")
        val connection: AmqpConnection = ar.result()

        connection.createReceiver("\$cbs",
            { done ->
                if (done.failed()) {
                    println("Unable to create receiver")
                } else {
                    val receiver: AmqpReceiver = done.result()
                    receiver.handler({ msg ->
                        // called on every received messages
                        println("Received " + msg.applicationProperties())
                        if (msg.applicationProperties().getInteger("status-code") == 202) {
                            println("authorisation successful")
                            connection.createSender("an-outgoing-queue", { done ->
                                if (done.failed()) {
                                    println("Unable to create a sender")
                                } else {
                                    val sender: AmqpSender = done.result()
                                    val builder: AmqpMessageBuilder = AmqpMessage.create()

                                    println("Sending new message")
                                    val message: io.vertx.amqp.AmqpMessage = builder
                                        .ttl(10000)
                                        .withBody("A test message")
                                        .build()
                                    sender.sendWithAck(message, {
                                        if (it.succeeded()) {
                                            println("Message accepted");
                                        } else {
                                            println("Message not accepted");
                                        }
                                    })
                                }
                            })

                            connection.createReceiver("an-incoming-queue",
                                { done ->
                                    if (done.failed()) {
                                        println("Unable to create receiver")
                                    } else {
                                        val receiver: AmqpReceiver = done.result()
                                        receiver.handler({ msg ->
                                            // called on every received messages
                                            println("Got message: ${msg.unwrap().body.toString()}")
                                        })
                                    }
                                }
                            )
                        }
                    })
                }
            }
        )

        connection.createSender("\$cbs", { done ->
            if (done.failed()) {
                println("Unable to create a sender")
            } else {
                val sender: AmqpSender = done.result()
                val builder: AmqpMessageBuilder = AmqpMessage.create()

                val expiry: Long = Instant.now().epochSecond + (60 * 60)
                val uri: String = "amqp://sb-name.servicebus.windows.net"
                val token: String = this.tokenForURI(uri, "RootManageSharedAccessKey", expiry)
                println("Sending message")
                val message: AmqpMessage = builder
                    .applicationProperties(
                        JsonObject(
                            mapOf(
                                "operation" to "put-token",
                                "type" to "servicebus.windows.net:sastoken",
                                "name" to uri
                            )
                        )
                    )
                    .ttl(10000)
                    .withBody(token)
                    .build()
                sender.sendWithAck(message, {
                    if (it.succeeded()) {
                        println("Message accepted");
                    } else {
                        println("Message not accepted");
                    }
                })
            }
        })
    }
})

and the logic for generating the token, which is specific to Microsoft Azure, is in:

fun sign(text: String): String {
    val key: ByteArray = "HzDVNiUPKevN57NoTqmeYdbJkU4zh7ri2FZlM1Qh634=".toByteArray(StandardCharsets.UTF_8)
    val mac: Mac = Mac.getInstance("HmacSHA256")
    val keySpec: SecretKeySpec = SecretKeySpec(key, "HmacSHA256")
    mac.init(keySpec)
    return Base64.getEncoder().encodeToString(mac.doFinal(text.toByteArray(StandardCharsets.UTF_8)))
}

fun tokenForURI(uri: String, keyName: String, expiry: Long): String {
    val encodedURI: String = URLEncoder.encode(uri, StandardCharsets.UTF_8)
    val signatureString: String = "${encodedURI}\n${expiry}"
    val signature: String = this.sign(signatureString)
    val token: String = "SharedAccessSignature sr=${encodedURI}&sig=${URLEncoder.encode(signature)}&se=${expiry}&skn=${keyName}"
    return token
}

I presume that we would want to be able to just pass the token itself.

I am not sure whether this kind of handling (having a connection which is shared for all channels and authorised before making channel connections) fits into the current model.

No promises because I am not at all familiar with the Smallrye way of doing things but if you have a vague idea of how this might integrate into the existing solution and where changes would need to be made, I could see about getting a PR together.

gemmellr commented 3 years ago

Service Bus doesnt require CBS usage to my knowledge. It supports it. Im not aware of any AMQP clients (or servers) other than Microsoft's that yet support it directly. I am aware of other AMQP clients being used against Service Bus that dont support it, and of Microsoft themselves creating a fuller JMS server offering based around an existing client that does not support it (https://azure.microsoft.com/en-us/blog/announcing-preview-of-java-message-service-2-over-amqp-on-azure-service-bus/). I dont personally think we should attempt to integrate CBS at this time.

Rupert-RR commented 3 years ago

@gemmellr if you have any links to documentation about other supported authorisation mechanisms for AMQP I would be grateful. I get the impression that they may support other mechanisms for other protocols, such as JMS, but for AMQP there is only CBS (I should add that I tried using the SAS key name and value as username and password via the Smallrye configuration) and did not get anywhere the values were seemingly ignored, the connection allowed but no links were able to be set up).

Is your preference for not integrating CBS at this time because you believe there to be alternatives or because it is not likely to be widely used (or indeed some other reason)?

gemmellr commented 3 years ago

( I would note though that the link in the original post is to an older draft document. The current draft is at https://www.oasis-open.org/committees/document.php?document_id=67918&wg_abbrev=amqp)

gemmellr commented 3 years ago

I can only refer you to Microsofts documentation in general, I dont have a specific link. I am not particularly familiar with Service Bus myself, but I am aware of a usage of several non-Microsoft AMQP clients over the years with Service Bus, clients that do not support CBS. For example, Qpid JMS (which I co-wrote) as Microsoft are working with per the earlier link. You could perhaps follow the guides around those bits for more assistance.

(Aside, note that JMS is an API, while AMQP is a protocol. Qpid JMS uses AMQP for its wire protocol, and has been used with Service Bus years before Microsoft looked to expand their server-side JMS feature support recently)

Rupert-RR commented 3 years ago

@gemmellr ok, thanks for that. The current draft certainly looks fuller (I was going from the link given in Microsoft's own documentation). I will have a look at Qpid JMS to see if that works (if it does that would suggest that I have a configuration problem with Smallrye or there is some other functionality being used that is not exposed by Smallrye).

gemmellr commented 3 years ago

Here is an Azure doc page for older documentation on using qpid-jms with Service Bus (long before the more recent 'JMS2 support' announced above): https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-jms-api-amqp.

The client does nothing special here as its entirely unaware its speaking to Service Bus (and Service Bus cant tell its the JMS client until after the authentication process either in this case).