reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 54 forks source link

Can we have a Receiver example with no Sender? #106

Open gregturn opened 4 years ago

gregturn commented 4 years ago

Reading the samples to grok the API, I found it confusing to see Sender-based code smack dab inside the ReceiveSample.

Motivation

A clear separation of Sender from Receiver seems to me the cleanest way to demonstrate the API. It drove me to using Spring AMQP's AmqpAdmin to put together the exchange, queue, and binding.

Additional context

I am doing a side-by-side comparison of Spring AMQP vs. Reactor RabbitMQ and learning what "magic" is provided by Spring Boot + Spring AMQP and it's annotations.

acogoluegnes commented 4 years ago

The sender is used to create the queue the receiver will consume from. Which API would you suggest to use to create this queue? Raw Java client?

gregturn commented 4 years ago

Are you saying RabbitMQ's APIs do this?

Looks like it's a call into Channel.declareQueue, which RabbitTemplate simply executes like:

this.rabbitTemplate.execute((channel) -> {
                DeclareOk[] declared = this.declareQueues(channel, queue);
                return declared.length > 0 ? declared[0].getQueue() : null;
            })

This ultimately ends up going to Rabbit's client Channel.declareQueue API, which presumably is where Sender ends up as well.

In fact, couldn't this code simply migrate to the Receiver class?

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = getChannelMonoForResourceManagement(options);

        AMQP.Queue.Declare declare;
        if (specification.getName() == null) {
            declare = new AMQImpl.Queue.Declare.Builder()
                .queue("")
                .durable(false)
                .exclusive(true)
                .autoDelete(true)
                .arguments(specification.getArguments())
                .build();
        } else {
            declare = new AMQImpl.Queue.Declare.Builder()
                .queue(specification.getName())
                .durable(specification.isDurable())
                .exclusive(specification.isExclusive())
                .autoDelete(specification.isAutoDelete())
                .passive(specification.isPassive())
                .arguments(specification.getArguments())
                .build();
        }

        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(declare);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage(future))
            .flatMap(command -> Mono.just((AMQP.Queue.DeclareOk) command.getMethod()))
            .publishOn(resourceManagementScheduler);
    }
gregturn commented 4 years ago

If I'm using an anonymous queue for each Receiver instance, how can the sender even know all the queues that need to be created?

acogoluegnes commented 4 years ago

I mean the RabbitMQ Java client (ConnectionFactory, Channel, etc), e.g. https://github.com/reactor/reactor-rabbitmq/blob/9794c4c9a897c632b6af410f0ac771208f2f0ecf/src/test/java/reactor/rabbitmq/SenderTests.java#L50-L56.

gregturn commented 4 years ago

So why not put that into Receiver, since that's where queue declarations typically happen?

acogoluegnes commented 4 years ago

As suggested in #107 comments, Sender contains resource management methods with the initial assumption that they are all "write" operations.

So why not put that into Receiver, since that's where queue declarations typically happen?

Not sure. Exchanges and queues need to be bound in some way, following this reasoning, where should binding management go?

gregturn commented 4 years ago

Agree that you could create an AmqpAdmin type thing for that. In lieu of that, I'd lean toward letting Sender declaring exchanges, and then Receiver finish by creating queues and binding to an exchange, since they are the ones with the information needed to make that judgment.

For more comparison, I adore using this tactic when I use Spring AMQP:

    @RabbitListener(bindings = @QueueBinding( //
            value = @Queue, //
            exchange = @Exchange("hacking-spring-boot"), //
            key = "new-items-spring-amqp"))

Putting aside the automated tie in to method invocation, this signals to create an anonymous queue, and bind it to an exchange with a key.

I'm not saying you need to adopt annotation-based bindings, but the Receiver indeed has the "rest" of the information needed to form the connection through the broker. ¯\_(ツ)_/¯