Open ZhuangRenyang opened 1 year ago
双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格
他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持
配置环境变量,在path中配置: 自己安装erlang的路径\bin
双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录 在该目录下打开dos窗口,输入以下运行命令
rabbitmq-plugins enable rabbitmq_management
启动结束后,访问:http://localhost:15672 用户名和密码都是:guest
接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。
消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。
它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里
创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖
<!--添加RabbitMQ依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> </dependency>
只有一个消费者
package cn.tedu.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //模拟简单模式的生产者 public class Producer { public static void main(String[] args)throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 /* 第一个参数:队列名称 第二个参数:是否是一个持久队列 第三个参数:是否是一个独占队列 第四个参数:是否自动删除 第五个参数:其他参数属性的设置 */ cc.queueDeclare("helloworld",false,false,false,null); //4.准备数据 String msg = "hello world" + System.currentTimeMillis(); //5.发送数据 /* 第一个参数:先忽略 第二个参数:先写成队列名称 第三个参数:其他属性设置 第四个参数:消息数据,需要转成byte[]类型 */ cc.basicPublish("","helloworld",null,msg.getBytes()); nc.close(); } }
package cn.tedu.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; //简单模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("helloworld",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); System.out.println("==========================="); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("helloworld",true,deliverCallback,cancelCallback); } }
多个消费者,从同一个队列中接受消息
负载均衡,消息会轮询发送给所有消费者
合理的分发消息
手动ack
消息回执
向服务器发送一个通知,告诉服务器一条消息已经处理完毕
服务器可以通过ack,知道消费者是空闲还是繁忙
qos=1
每次抓取的消息数量
消息处理完毕之前,不会抓取新消息
手动ack模式下才有效
消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟工作队列模式的生产者 public class Producer { public static void main(String[] args)throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 /* 第一个参数:队列名称 第二个参数:是否是一个持久队列 第三个参数:是否是一个独占队列 第四个参数:是否自动删除 第五个参数:其他参数属性的设置 */ cc.queueDeclare("work_queue",false,false,false,null); //4.准备数据,发送数据 while(true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("","work_queue",null,msg.getBytes()); } nc.close(); } }
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; //工作队列模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("work_queue",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); //处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景 for (int i = 0; i < msg.length(); i++) { //charAt(i) if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("work_queue",true,deliverCallback,cancelCallback); } }
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; //工作队列模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("work_queue",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); //处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景 for (int i = 0; i < msg.length(); i++) { //charAt(i) if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); //发送消息确认(消息回执) //第一个参数:消息的标签,需要从消息中获取 //第二个参数:是否确认多条信息,false,只确认一条消息 cc.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; //第二个参数:ack的设置,默认是autoAck,false是手动ack //设置消费者每次只获取一条消息 cc.basicQos(1); cc.basicConsume("work_queue",false,deliverCallback,cancelCallback); } }
如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失
停止rabbitmq服务:rabbitmq-service stop或者rabbitmqctl stop 启动rabbitmq服务:rabbitmq-service start
消息数据持久化、消息队列持久化
但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改
可以选择新建一个别名队列或者删除该队列重新创建
队列持久化:cc.queueDeclare("task_queue",true,false,false,null);
数据持久化:cc.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
把消息群发给所有消费者,同一条消息所有消费者都可以收到
fanout类型的交换机
生产者:定义交换机,向交换机发送数据
消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据
package cn.tedu.rabbitmq.publishsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟发布订阅模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义交换机 //如果服务器中没有,就创建,有就直接使用 //第一个参数:交换机名称 //第二个参数:交换机类型 cc.exchangeDeclare("ps_exchange","fanout"); //4.向交换机发送数据,交换机只是接受发送数据,并不保存数据 //如果没有消费者接受,数据会丢失 while (true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("ps_exchange","",null,msg.getBytes()); } nc.close(); } }
package cn.tedu.rabbitmq.publishsubscribe; import com.rabbitmq.client.*; import java.io.IOException; //发布订阅模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义交换机 //如果服务器中没有,就创建,有就直接使用 //第一个参数:交换机名称 //第二个参数:交换机类型 cc.exchangeDeclare("ps_exchange","fanout"); //4.定义队列--随机队列名,否持久,独占、自动删除的 String queue = cc.queueDeclare().getQueue();//无参构造就可以满足我们的需求 //5.绑定交换机和队列 /* 第一个参数:队列名称 第二个参数:交换机名称 第三个参数:队列和交换机绑定的关系 */ cc.queueBind(queue,"ps_exchange",""); //6.处理接收到的消息 //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到:"+msg); System.out.println("消息处理完毕"); } }; //取消数据时的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
通过关键字匹配,来决定把消息发送到哪个队列
生产者:定义direct类型的交换机,向交换机发送数据并携带路由键
消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息
package cn.tedu.rabbitmq.route; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟路由模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); //4.发送消息,携带路由键 while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("route_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
package cn.tedu.rabbitmq.route; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; //模拟路由模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); //4.定义队列 String queue = cc.queueDeclare().getQueue(); //5.绑定交换机和队列(重复绑定多次) System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"route_exchange",bindingKey); } //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
和路由模式相同,具有特殊的关键字规则
topic类型的交换机实现这种特殊路由规则
aaa.bbb.ccc.ddd
*.ccc.ddd.eee
"*"可以通配单个单词
"#"可以通配零个或多个单词
package cn.tedu.rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟主题模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); //4.发送消息,携带路由键 while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("topic_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
package cn.tedu.rabbitmq.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; //模拟主题模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); //4.定义队列 String queue = cc.queueDeclare().getQueue(); //5.绑定交换机和队列(重复绑定多次) System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"topic_exchange",bindingKey); } //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
实现原理
两个队列
调用队列
返回队列:每个客户端,都需要有自己的返回队列
返回队列的队列名
在调用消息数据中,携带返回队列名
根据返回队列名,向正确的返回队列来发送计算结果
关联ID
用来匹配计算结果和调用
如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果
客户端发送调用时,携带一个关联ID
服务器端返回结果时,也携带这个关联ID
客户端:多线程异步处理结果
主线程
发送调用信息
需要计算结果时,要取结果
次线程:等待接受结果
线程之间传递数据,可以使用BlockingQueue
集合工具
这个集合中,添加了线程的等待和通知
如果没有数据,取数据时会暂停等待
有多个子类:比如ArrayBlockingQueue
package cn.tedu.rabbitmq.rpc; import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; public class RPCServer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //1.接受客户端发送的调用信息(正整数n) //2.执行算法求第n个斐波那契数 //3.向客户端发送计算结果 //定义调用队列 cc.queueDeclare("rpc_queue",false,false,false,null); //从调用队列中取调用信息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { //从delivery取出正整数n String msg = new String(delivery.getBody()); String replyTo = delivery.getProperties().getReplyTo();//返回队列名 String correlationId = delivery.getProperties().getCorrelationId();//关联id long fbnqs = fbnqs(Integer.parseInt(msg)); BasicProperties basicProperties = new BasicProperties.Builder().correlationId(correlationId).build(); cc.basicPublish("",replyTo,basicProperties,(""+fbnqs).getBytes()); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("rpc_queue",true,deliverCallback,cancelCallback); } //服务:求第n个斐波那契数 //1 1 2 3 5 8 13 21 34 55 ...... //递归效率低,可以用来模拟服务器端的耗时运算 static long fbnqs(int n){ if(n==1 || n==2) return 1; return fbnqs(n-1)+fbnqs(n-2); } }
package cn.tedu.rabbitmq.rpc; import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class RPCClient { static BlockingQueue<Long> q = new ArrayBlockingQueue<Long>(10); public static void main(String[] args) throws Exception{ System.out.print("输入求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); long fbnqs = fbnqs(n); System.out.println("第"+n+"个的斐波那契数是:"+fbnqs); } //异步调用服务器,从服务器中获取结果 private static long fbnqs(int n) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //定义调用队列 cc.queueDeclare("rpc_queue",false,false,false,null); //返回队列 String replyTo = cc.queueDeclare().getQueue(); //关联id String cid = UUID.randomUUID().toString(); BasicProperties basicProperties = new BasicProperties.Builder() .replyTo(replyTo) .correlationId(cid) .build(); cc.basicPublish("","rpc_queue",basicProperties,(""+n).getBytes()); //模拟执行其他运算,不等待计算结果 System.out.println("调用消息已经发送"); System.out.println("模拟执行其他运算,不立即等待计算结果"); //获取计算结果 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { //处理数据之前,先对比关联id if(cid.equals(delivery.getProperties().getCorrelationId())){ String msg = new String(delivery.getBody()); long fbnqs = Integer.parseInt(msg); q.offer(fbnqs); cc.getConnection().close(); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(replyTo,true,deliverCallback,cancelCallback); return q.take(); } }
消息队列-RabbitMQ
安装rabbitMQ
双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格
他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持
配置环境变量,在path中配置: 自己安装erlang的路径\bin
双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录 在该目录下打开dos窗口,输入以下运行命令
启动结束后,访问:http://localhost:15672 用户名和密码都是:guest
rabbitMQ的基本概念
Exchange
接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。
Message Queue
消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。
Binding Key
它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。
Routing Key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里
RabbitMQ六种工作模式
创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖
简单模式
只有一个消费者
生产者
消费者
工作队列模式
多个消费者,从同一个队列中接受消息
负载均衡,消息会轮询发送给所有消费者
合理的分发消息
手动ack
消息回执
向服务器发送一个通知,告诉服务器一条消息已经处理完毕
服务器可以通过ack,知道消费者是空闲还是繁忙
qos=1
每次抓取的消息数量
消息处理完毕之前,不会抓取新消息
手动ack模式下才有效
消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失
生产者
消费者
合理分发
持久化
如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失
消息数据持久化、消息队列持久化
但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改
可以选择新建一个别名队列或者删除该队列重新创建
队列持久化:cc.queueDeclare("task_queue",true,false,false,null);
数据持久化:cc.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
发布订阅模式
把消息群发给所有消费者,同一条消息所有消费者都可以收到
fanout类型的交换机
生产者:定义交换机,向交换机发送数据
消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据
生产者
消费者
路由模式
通过关键字匹配,来决定把消息发送到哪个队列
生产者:定义direct类型的交换机,向交换机发送数据并携带路由键
消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息
生产者
消费者
主题模式
和路由模式相同,具有特殊的关键字规则
topic类型的交换机实现这种特殊路由规则
aaa.bbb.ccc.ddd
*.ccc.ddd.eee
.ddd
"*"可以通配单个单词
"#"可以通配零个或多个单词
生产者
消费者
RPC模式
实现原理
两个队列
调用队列
返回队列:每个客户端,都需要有自己的返回队列
返回队列的队列名
在调用消息数据中,携带返回队列名
根据返回队列名,向正确的返回队列来发送计算结果
关联ID
用来匹配计算结果和调用
如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果
客户端发送调用时,携带一个关联ID
服务器端返回结果时,也携带这个关联ID
客户端:多线程异步处理结果
主线程
发送调用信息
需要计算结果时,要取结果
次线程:等待接受结果
线程之间传递数据,可以使用BlockingQueue
集合工具
这个集合中,添加了线程的等待和通知
如果没有数据,取数据时会暂停等待
有多个子类:比如ArrayBlockingQueue
服务器
客户端