metalbear-co / mirrord

Connect your local process and your cloud environment, and run local code in cloud conditions.
https://mirrord.dev
MIT License
3.73k stars 101 forks source link

RabbitMQ Splitting #2602

Open aviramha opened 2 months ago

aviramha commented 2 months ago

Similar to #2066

Currently in design/planning. Questions to potential users:

  1. How are you managing your RabbitMQ? (Cloud provider, external provider, if so which?, deployed in k8s)
  2. How can we obtain RabbitMQ credentials for RabbitMQ admin (to create queues)
  3. How is the configuration set from the application side? (queue name, broker, credentials, etc)
nabeelpaytrix commented 1 month ago

We're very much interested in this feature!

  1. We are running Amazon MQ with the RabbitMQ engine
  2. We store RabbitMQ credentials within the Kubernetes cluster as Secrets, these secrets are then mounted onto RabbitMQ clients as environment variables in the pod. e.g.
      spec:
        env:
          - name: RABBIT_MQ_PWD
            valueFrom:
              secretKeyRef:
                name: amazonmq-password
                key: password
                optional: false

For microservices, you can require a secret to exist in the cluster and same namespace as the intercepted service.

  1. We use the Spring RabbitMQ Starter library, which by default generates queues if they do not already exist when attempting to publish or consume from a non-existent exchange/queue.

Broker and credentials are configured using application properties within Java Spring boot, e.g.

spring:
  rabbitmq:
    host: rabbitmq.domain.com
    port: 5672
    virtual-host: /
    username: ${RABBIT_MQ_USERNAME:defaultUser}
    password: ${RABBIT_MQ_PWD:defaultPass}
    ssl:
      enabled: true

Example publish code:

private final reactor.rabbitmq.Sender sender;

@Traceable(step = "trigger", inLogLevel = INFO, outLogLevel = INFO)
    public Mono<Void> triggerEvent(final CustomEvent message, final String routingKey) {
        final AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
            .deliveryMode(MessageDeliveryMode.toInt(MessageDeliveryMode.PERSISTENT))
            .headers(/*Set headers here*/)
            .build();

        final OutboundMessage outboundMessage = new OutboundMessage(
            exchangeName,
            routingKey,
            props,
            message.toByteArray()
        );

        return sender.send(Mono.just(outboundMessage)).onErrorResume(e -> Mono.empty());
    }

Example consume code:

@RabbitListener(
        bindings = @QueueBinding(
            value = @Queue(
                value = "my-queue-which-may-or-may-not-exist", durable = "true"),
            exchange = @Exchange(value = "my-exchange-which-may-or-may-not-exist", type = ExchangeTypes.TOPIC),
            key = "my-binding-key"
        )
    )
    @Traceable(step = "subscriber", inLogLevel = Level.INFO, outLogLevel = Level.INFO)
    public Mono<Void> consumeEvent(CustomEvent message, @Headers Map<String, Object> headers) {
        // Code for processing the RabbitMQ `message` and `headers`
    }
aviramha commented 1 month ago

Thank you @nabeelpaytrix , This is perfect and will help us a lot building something that can be used on your end.