kagxin / blog

个人博客:技术、随笔、生活
https://github.com/kagxin/blog/issues
7 stars 0 forks source link

RabbitMQ #40

Open kagxin opened 4 years ago

kagxin commented 4 years ago

消息通信的三个概念

AMQP三元素

交换机

队列

绑定

虚拟主机

docker-compose

version: "3"
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    hostname: mymq
    environment:
      - RABBITMQ_DEFAULT_USER=root
      - RABBITMQ_DEFAULT_PASS=root
    ports:
      - 15672:15672
      - 5672:5672
    restart: always
    volumes:
      - /Users/kangxin/docker/rabbitmq/data:/var/lib/rabbitmq

rabbitmq management web 插件在15672端口,rabbitmq服务在5672端口

示例

topic交换机例子 绑定关系如图

image

这样路由键是hola.one 的消息会进入到队列hello-queue中 路由键符合hola.* 模式的都会进入hello-queue2中

生产者

credentials = pika.PlainCredentials("root", "root") conn_params = pika.ConnectionParameters("127.0.0.1", credentials=credentials)

conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel()

channel.exchange_declare(exchange="hello-exchange", exchange_type="topic", passive=False, durable=True, auto_delete=False)

msg_props = pika.BasicProperties() msg_props.content_type = "text/plain" # 声明消息类型

channel.basic_publish(body='i am message routing_key: hola.two', exchange="hello-exchange", properties=msg_props, routing_key="hola.two") # 发布一条消息路由键是 hola.two

channel.basic_publish(body='i am message routing_key: hola.one', exchange="hello-exchange", properties=msg_props, routing_key="hola.one") # 发布一条消息路由键是 hola.one

#### 消费者
```python
import pika

credentials = pika.PlainCredentials("root", "root")
conn_params = pika.ConnectionParameters('127.0.0.1', credentials=credentials)

conn_broker = pika.BlockingConnection(conn_params)
channel = conn_broker.channel()

channel.exchange_declare(exchange="hello-exchange", exchange_type="topic", passive=False, durable=True,
                         auto_delete=False)  # 声明交换机
channel.queue_declare("hello-queue")  # 声明队列
channel.queue_declare("hello-queue2")  # 声明队列

# 确定绑定关系,只接受routingkey为hola.one的消息
channel.queue_bind(queue="hello-queue", exchange="hello-exchange", routing_key="hola.one")

# 确定绑定关系,接受所有符合 hola.* 模式的消息
channel.queue_bind(queue="hello-queue2", exchange="hello-exchange", routing_key="hola.*")

def msg_consumer(channel, method, header, body):  # 消息处理函数, 处理hello-queue队列中消息
    print(f'msg_consumer: bind routing_key: hola.one, message: {body.decode()}')
    channel.basic_ack(delivery_tag=method.delivery_tag)

def msg_consumer_all(channel, method, header, body):  # 消息处理函数,处理hello-queue2队列中消息
    print(f'msg_consumer_all: bind routing_key: hola.*, message: {body.decode()}')
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume("hello-queue", msg_consumer, consumer_tag="hello-consumer")  # 注册回调函数
channel.basic_consume("hello-queue2", msg_consumer_all, consumer_tag="all-consumer")  # 注册回调函数
print('start...')
channel.start_consuming()  # 启动消费

控制台日志

start...
msg_consumer: bind routing_key: hola.one, message: i am message routing_key: hola.one
msg_consumer_all: bind routing_key: hola.*, message: i am message routing_key: hola.two
msg_consumer_all: bind routing_key: hola.*, message: i am message routing_key: hola.one

两条消息都进入了队列 hello-queue2