vieyahn2017 / iBlog

44 stars 0 forks source link

3.15 celery/kombu以及 #67

Closed vieyahn2017 closed 4 years ago

vieyahn2017 commented 6 years ago

https://github.com/celery/kombu

vieyahn2017 commented 6 years ago

https://github.com/richardasaurus/celerybeat-sqlalchemy-scheduler

vieyahn2017 commented 6 years ago

https://github.com/benwilber/django-celerybeat-lock

vieyahn2017 commented 6 years ago

https://github.com/bendmorris/leek

highly available distributed celerybeat scheduler

vieyahn2017 commented 6 years ago
INSTALLED_APPS = (
    "kombu.transport.django",
)

django-kombu contains a single transport, djkombu.transport.DatabaseTransport, which is used like this:

>>> from kombu.connection import BrokerConnection
>>> c = BrokerConnection(transport="djkombu.transport.DatabaseTransport")
vieyahn2017 commented 4 years ago

记录两个用pika和kombu实现的rabbitmq队列操作

https://blog.csdn.net/qq_21398167/java/article/details/52625258

用kombu的


import ConfigParser
from kombu import Connection
#from log import logger

class PyRabbitmq(object):
    def __init__(self):
        self.user = 'nova'
        self.conn = None

    def rbt_connection(self):
        cf = ConfigParser.ConfigParser()
        cf.read("/etc/nova/nova.conf") 从配置文件中获取rabbit的主机和密码
        pwd = cf.get("DEFAULT", "rabbit_password")
        hosts = cf.get("DEFAULT", "rabbit_hosts")
        hosts_list = hosts.split(',')   HA中有多个rabbit主机
        for host_list in hosts_list: 依次链接rabbit,知道链接成功
            url = 'amqp://%s:%s@%s//' %(self.user, pwd, host_list)
            try:
                self.conn = Connection(url)
            except Exception as e:
#                logger.error("connect rabbitmq failed: %s" % e)
                 print "1"

    def rbt_disconnect(self) 断开连接
        self.conn.release()
        self.conn = None

    def get_fence_nodes(): 
        simple_queue = self.conn.SimpleQueue('simple_queue') 获取队列名为 <span style="font-family: Arial, Helvetica, sans-serif;">simple_queue 的内容</span>

        if simple_queue.qsize() == 0:
            fence_nodes = 
            simple_queue.close()
            return fence_nodes

        else:
            msg = simple_queue.get(block=True, timeout=1) 得到里面的数据
            fence_nodes = msg.payload
            msg.ack()
            simple_queue.close()
            return fence_nodes

    def set_fence_nodes(fence_nodes):
        simple_queue = self.conn.SimpleQueue('simple_queue') 连接到队列
        simple_queue.put(fence_nodes) 数据传入队列
        simple_queue.close()

用pika 的

import pika
import ConfigParser
import json
#from log import logger

class PyRabbitmq(object):
    def __init__(self):
        self.user = 'nova'
        self.port = 5673
        self.msg_list = None
        self.rbt_connection() 

    def rbt_connection(self):
        cf = ConfigParser.ConfigParser()
        cf.read("/etc/nova/nova.conf") 从配置文件中获取数据
        pwd = cf.get("DEFAULT", "rabbit_password")
        hosts = cf.get("DEFAULT", "rabbit_hosts")
        hosts_list = hosts.split(',')
        for host_list in hosts_list:
            host = host_list.split(':')
            credential = pika.PlainCredentials(self.user, pwd)
            try:
                pid = pika.ConnectionParameters(host[0],
                                                self.port, '/',
                                                credential)
                connection = pika.BlockingConnection(pid)
                self.channel = connection.channel()
                self.channel.exchange_declare(exchange='first', type='fanout')
                self.channel.queue_declare(queue='fence_nodes')
                self.channel.queue_bind(exchange='first', queue='fence_nodes')
#                return channel
            except Exception as e:
#                logger.error("connect rabbitmq failed: %s" % e)
                 print "1"

    def callback(self, ch, method, properties, body):
        print body
        self.msg_list = body

    def publish(self, msg_list): 传入数据
#        channel = self.rbt_connection()
#        channel.exchange_declare(exchange='first', type='fanout')
#        channel.queue_declare(queue='fence_nodes')
#        channel.queue_bind(exchange='first', queue='fence_nodes')
        msg = json.dumps(msg_list)
        print msg
        self.channel.basic_publish(exchange='first',
                              routing_key='',
                              body=msg)

    def consume(self):获取数据
#        channel = self.rbt_connection()
#        channel.queue_declare(queue='fence_nodes')
        self.channel.basic_consume(self.callback, queue='fence_nodes',
                              no_ack=True)
        if not self.msg_list:
            return self.msg_list
        else:
            msg = json.loads(self.msg_list)
            return msg

def get_fence_nodes():
    rbt_obj = PyRabbitmq()
    fence_nodes = rbt_obj.consume()
    if not fence_nodes:
        fence_nodes = {}
        return fence_nodes
    else:
        rbt_obj.publish(fence_nodes)
        return fence_nodes

def put_fence_nodes(fence_nodes):
    rbt_obj = PyRabbitmq()
    rbt_obj.consume()
    if fence_nodes:
        rbt_obj.publish(fence_nodes)
vieyahn2017 commented 4 years ago

kombu消息框架

https://blog.csdn.net/weixin_37947156/article/details/76372427

重要概念 在Kombu中,有一些重要的概念需要事先了解,有的与RabbitMQ相同,也有的是RabbitMQ中没有的,下面来具体看一下。 Producers: 发送消息给exchange Exchanges: 用于路由消息(消息发给exchange,exchange发给对应的queue)。路由就是比较routing-key(这个message提供)和binding-key(这个queue注册到exchange的时候提供)。使用时,需要指定exchange的名称和类型(direct,topic和fanout)。可以发现,和RabbitMQ中的exchange概念是一样的。 Consumers: consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。 Queues: 接收exchange发过来的消息 Routing keys: 每个消息在发送时都会声明一个routing_key。routing_key的含义依赖于exchange的类型。一般说来,在AMQP标准里定义了四种默认的exchange类型,此外,vendor还可以自定义exchange的类型。但是,我们下面只关注AMQP 0.8版本中定义的三种默认exchange类型,也是最常用的三类exchange。

Direct exchange: 如果message的routing_key和某个consumer中的routing_key相同,就会把消息发送给这个consumer监听的queue中。 Fan-out exchange: 广播模式。exchange将收到的message发送到所有与之绑定的queue中。 Topic exchange: 该类型exchange会将message发送到与之routing_key类型相匹配的queue中。routing_key由一系列“.”隔开的word组成,“*”代表匹配任何word,“#”代表匹配0个或多个word,类似于正则表达式。 可以看到,除了有在AMQP中定义的exchange、queue等概念,Kombu还对消息的发送方Producers和接收方Consumer作了抽象,这在RabbitMQ是没有的。不知道大家还记不记得RabbitMQ中消息的发送和接收的方式。在RabbitMQ中,发送是将指定完exchange、routing_key以及其他一些属性的message用channel.basic_publish方法来实现的,接收则是声明一个queue然后与exchange绑定,并指定callback函数。而在Kombu中消息的发送和接收则会更加简单和形象,更加的面向对象,相信大家在后面的例子中会强烈的感受到。

主要特点 接下来,我们从整体上来了解一下Kombu的主要特点。

支持将不同的消息中间件以插件的方式进行灵活配置。Kombu中使用transport这个术语来表示一个具体的消息中间件(后续均用broker指代)。

transport使用py-amqp、librabbitmq、qpid-python等链接库来实现与RabbitMQ、Qpid等broker的连接。 用C语言实现了一个高性能的rabbitmq链接库——librabbitmq 引入transport这个抽象概念可以使得后续添加对non-AMQP的transport非常简单。当前Kombu中build-in支持有Redis、Beanstalk、Amazon SQS、CouchDB,、MongoDB,、ZeroMQ,、ZooKeeper、SoftLayer MQ和Pyro。 同样,可以使用SQLAlchemy或Django ORM transport来将数据库当作broker使用 In-memory transport for unit testing(没理解= =) 支持对message的编码、序列化和压缩 对各种transport提供一致的异常处理 对connection和channle中的错误提供优雅的处理方案 最重要的就是对所有的broker进行了抽象——transport,为不同的broker提供了一致的解决方案。通过Kombu,开发者可以根据实际需求灵活的选择或更换broker。下表对当前主流broker之间进行了一些对比。for example——Hello World 下面我们以一个Hello World程序作为本篇博客的结尾,看看Kombu是怎么帮助我们实现消息通信的。一些细节看不懂没关系,在后续我们会对里面的每个对象进行深入的学习。

首先是消息发送端hello_publisher.py:

from kombu import Connection
import datetime

# "amqp://guest:guest@localhost:5672//"中的amqp就是上文所提到的transport,
# 后面的部分是连接具体transport所需的参数,具体含义下篇博客中会讲到
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    simple_queue = conn.SimpleQueue('simple_queue')
    message = 'helloword, sent at %s' % datetime.datetime.today()
    simple_queue.put(message)
    print('Sent: %s' % message)
    simple_queue.close()

然后是消息接收端hello_consumer.py:

from kombu import Connection

with Connection('amqp://guest:guest@localhost:5672//') as conn:
    simple_queue = conn.SimpleQueue('simple_queue')
    message = simple_queue.get(block=True, timeout=1)
    print("Received: %s" % message.payload)
    message.ack()
vieyahn2017 commented 4 years ago

Kombu 源码解析一

https://www.jianshu.com/p/b4c53f3f205f 声明:本文仅限于简书发布,其他第三方网站均为盗版,原文地址: Kombu 源码解析一

Kombu 源码解析一 玩 Python 的同学可能很多都听说过甚至玩过 Celery,Celery 作为 Python 中最流行的异步消息队列可以说是非常得受欢迎。但是,用得多的同学相信也是有一个感触,那就是时不时会遇到莫名奇妙的坑,而且最后都只能通过重启解决,因为要找到原因实在不是一件容易的事情。

而我作为这受伤人群中的一个,想了解一下这个大坑内部的原理,所以就曾经扒过它内部的实现,但是,由于 Celery 确实有点庞大,而且我认为代码实现得也不是很多,所以,只是学习到了一部分的知识,但是,却是收获良多。所以,最近因为各种情况,我这次深扒一次,所以后续将会有一系列的文章是讲 Celery 实现的。

当你尝试查看 Celery 的源码的时候,你会发现有一个你根本绕不过的坑的,那就是 Kombu,Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象,但是,这只是作者的一面说辞,你看完代码就知道了,它是尝试做异步MQ 的兼容 AMQP 的抽象队列。所以,我决定先从 Kombu 的代码写起,而这篇文章是第一篇!

我在这个系列里面讲的都是 Kombu 4.1.0 和 Celery 4.1.0 的事,先提前说一下,以免后续的同学对不上号。

下载 Kombu 代码 Kombu 的代码很好找,因为它和 Celery 是强耦合的,所以,它的代码放在 Celery 的代码组里头,可以很简单得 Github 上找到,所以下载也是很简单了:

git clone https://github.com/celery/kombu.git cd kombu git checkout v4.1.0 这样,你就得到了我这个系列文章里面的 Kombu 的源代码了,而我后面的所有代码来源以及行数都是对应这个版本的代码的!

整体 Review 拿到代码之后,我就先摒弃所有的非代码部分,直接来整体看看 kombu 这个目录下有什么:

可以看到,在 kombu 这个目录里面大部分都是文件,而只有三个文件夹,分别是:

async:异步操作的 函数 和 类 transport:兼容各种 MQ 的类 utils: 一些辅助 函数 和 类 从这个目录中,我们就发现了一些问题,你说你既然是抽象 MQ 的,你要啥异步操作啊,这有点越界了呀喂。

简单添加/获取消息 OK,概览完我们看一些实际的东西,先尝试用 Kombu 写一个生产/消费的代码吧,这个 DEMO 分为两部分

simple_receive.py:一个简单的消费者 simple_send.py:一个简单的生产者 下面看看这两段代码都是怎么写的:

还是老规矩,这两段代码你都可以在我的 Github 中找到。先看看看消费者,消费者的代码比较简单,我们先建立和 MQ 的连接,然后再从 MQ 的指定队列里面将消息拿出来,处理掉。

而生产者则相反,前面也是要先和队列建立连接,但是,有一点不一样的是这里创建了一个 exchange,然后再往队列里面发送消息,而发送消息的同时还是搭配好多参数。

这里就是 Kombu 有意思的一点了,它意图对所有的 MQ 进行抽象,然后通过接口对外暴露出一致的 API,这样我们就不用关心底层用的是什么 MQ 了,Redis/RabbitMQ/MongoDB 之类的随便切换。

Kombu 的 MQ 模型 因为 Kombu 是对 AMQP 进行抽象,所以它必定有抽象的模型,事实上,它大体上和 RabbitMQ 差不多,但是,不完全一样,有一些差别,下面就介绍一下 Konbu 的抽象模型。

在 Kombu 中,存在多个概念,其实我们在前边简单的生产/消费者样例中已经看到了了一些,他们分别是:

Message:生产消费的基本单位,其实就是我们所谓的一条条消息 Connection:对 MQ 连接的抽象,一个 Connection 就对应一个 MQ 的连接 Transport:真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例 Producers: 发送消息的抽象类 Consumers:接受消息的抽象类 Exchange:MQ 路由,这个和 RabbitMQ 差不多,支持 5 类型 Queue:对应的 queue 抽象,其实就是一个字符串的封装 消息是发送到那个 Queue 的 假设我们想要发送一个消息到 Redis 中名为 'test' 的 queue 中,那么 Kombu 是怎么做的,这就设计到 Exchange 的概念了。目前 Kombu 对于不同 MQ 的支持是这样的:

假设我们用的是 Direct,那么我们的 Producer 在生产的时候只需要指定 Queue=test 即可,这样就会发送的 test 这个 queue 中。更多关于 Exchange 的知识可以参考:AMQP 0-9-1 Model Explained

这一篇就先介绍到这里,后续就开始深入到代码层面,看下这个模型中的各个实体是如何实现的。

Reference