woshikid / blog

Apache License 2.0
8 stars 1 forks source link

RabbitMQ学习笔记 #185

Open woshikid opened 2 years ago

woshikid commented 2 years ago

安装EPEL源

yum install epel-release

安装RabbitMQ Server

yum install rabbitmq-server

启用管理界面

rabbitmq-plugins enable rabbitmq_management

启动RabbitMQ

systemctl start rabbitmq-server

访问RabbitMQ管理界面 http://localhost:15672/ 默认用户名/密码:guest/guest

用户操作

rabbitmqctl change_password guest password
rabbitmqctl add_user devops password
rabbitmqctl set_user_tags devops administrator
rabbitmqctl list_users
rabbitmqctl delete_user devops

消息机制

生产者 -> 交换机(路由规则) --> 队列 --> 消费者

交换机

Java client

POM

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

自定义配置

spring:
  rabbitmq:
    host: localhost # 默认为localhost
    port: 5672 # 默认为5672或5671(SSL)
    username: guest # 默认为guest
    password: password # 默认为guest
    listener:
      simple:
        retry: # 开启重试后,消息处理失败时不再放回队列
          enabled: true # 默认为false
          max-attempts: 3 # 默认为3次=初始1次+重试2次
        default-requeue-rejected: false # 消息处理失败时重新放回队列,默认为true
        concurrency: 4 # 监听器并发线程数,默认为1

定义队列

@Bean
public Queue queue() {
    // name: 队列名
    // durable: 持久化
    // exclusive: 独占,仅当前连接可见,连接断开后删除
    // autoDelete: 所有消费者都断开后自动删除
    // 如服务器已有该队列则不会覆盖
    return new Queue("testQueue", false, false, false);
    //return new Queue("testQueue", false, false, false, Map.of("x-max-length", 5)); // 达到队列最大长度后最早的消息将被丢弃
}

定义交换机

@Bean
public DirectExchange directExchange() {
    // name: 交换机名
    // durable: 持久化
    // autoDelete: 所有绑定都解除后自动删除
    // 如服务器已有该交换机则不会覆盖
    return new DirectExchange("directExchange", false, false);
}

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange", false, false);
}

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange", false, false);
}

@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headersExchange", false, false);
}

绑定路由

@Bean
public Binding bindingDirect() {
    return BindingBuilder.bind(queue()).to(directExchange()).with("routingKey");
}

@Bean
public Binding bindingFanout() {
    return BindingBuilder.bind(queue()).to(fanoutExchange());
}

@Bean
public Binding bindingTopic() {
    // . 用来分隔路径中的名字,* 用来匹配路径中的名字,# 用来匹配所有剩余路径
    return BindingBuilder.bind(queue()).to(topicExchange()).with("foo.*");
    //return BindingBuilder.bind(queue()).to(topicExchange()).with("foo.#");
}

@Bean
public Binding bindingHeaders() {
    Map<String, Object> headers = new HashMap<>();
    headers.put("key1", "value1");
    headers.put("key2", "value2");
    return BindingBuilder.bind(queue()).to(headersExchange()).whereAll(headers).match(); // 全部匹配
    //return BindingBuilder.bind(queue()).to(headersExchange()).whereAny(headers).match(); // 任意匹配
}

RabbitTemplate

@Autowired
private RabbitTemplate rabbitTemplate;

发送消息

rabbitTemplate.convertAndSend("directExchange", "routingKey", "message");
rabbitTemplate.convertAndSend("fanoutExchange", null, "message");
rabbitTemplate.convertAndSend("topicExchange", "foo.bar", "message");

MessageProperties properties = new MessageProperties();
properties.setHeader("key1", "value1");
properties.setHeader("key2", "value2");
Message message = new Message("message".getBytes(), properties);
rabbitTemplate.convertAndSend("headersExchange", null, message);

rabbitTemplate.convertAndSend("headersExchange", null, "message", message -> {
    message.getMessageProperties().getHeaders().putAll(Map.of("key1", "value1", "key2", "value2"));
    return message;
});

接收消息

// 立即接收,没有消息则返回null
rabbitTemplate.receiveAndConvert("testQueue");
// 同步接收,阻塞直到收到消息或超时,超时返回null
rabbitTemplate.receiveAndConvert("testQueue", 1000);

监听消息

@Component
@RabbitListener(queues = "testQueue", errorHandler = "errorHandler")
// 自动定义队列,如服务器已有该队列则不会覆盖
//@RabbitListener(queuesToDeclare = @Queue("testQueue")) // 默认durable = "true"
//@RabbitListener(queuesToDeclare = @Queue(name = "testQueue", arguments = @Argument(name = "x-message-ttl", value = "10000", type = "java.lang.Long")))
// 自动定义队列/交换机/路由,如服务器已有该队列/交换机则不会覆盖
//@RabbitListener(bindings = @QueueBinding(value = @Queue("testQueue"), exchange = @Exchange("directExchange"), key = "routingKey")) // exchange默认type = "direct", durable = "true"
public class RabbitConsumer {

    @RabbitHandler // 支持重载以处理不同类型的消息
    public void process(String message) {
        System.out.println(message);
    }
}

异常处理 消息处理失败时默认重新放回队列导致无限重试

@Bean
public RabbitListenerErrorHandler errorHandler() {
    return (amqp, message, exception) -> {
        System.out.println(exception);
        return null;
    };
}

死信队列

@Bean
public Queue queue() {
    // 消息被丢弃时将进入死信队列(需自行定义死信队列)
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "DLQExchange");
    arguments.put("x-dead-letter-routing-key", "DLQ");
    return new Queue("testQueue", false, false, false, arguments);
}

消息有效期

@Bean
public Queue queue() {
    return new Queue("testQueue", false, false, false, Map.of("x-message-ttl", 10000));
}

rabbitTemplate.convertAndSend("directExchange", "routingKey", "message", message -> {
    message.getMessageProperties().setExpiration("10000");
    return message;
});