JXtreehouse / python_lessions

6 stars 3 forks source link

kazoo模块 #1

Open AlexZ33 opened 5 years ago

AlexZ33 commented 5 years ago

kazoo是zookeeper的python模块,使用python编写的。使用python写运维平台的就可以用上

pip install kazoo

因为kazoo使用纯python实现zookeeper的协议,所以不必安装Python zookeeper C的各种依赖。


#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
参考:http://kazoo.readthedocs.io/en/latest/basic_usage.html#creating-nodes
'''

from kazoo.client import KazooClient

# Create a client and start it
zk = KazooClient(hosts='10.14.86.178:2181')
zk.start()

# Now you can do the regular zookepper API calls
# Ensure some paths are created required by your application
# zk.ensure_path("/app/someservice")

#watchers
# @zk.ChildrenWatch('/mysql/host')
# def mysql_watcher(children):
#     print('/mysql/host changed :%s'%children)

#获取指定节点的值
# print(zk.get('/mysql/host',watch=mysql_watcher2))
# print(zk.get('/mysql/port',watch=None))
# print(zk.get('/mysql/user',watch=None))
# print(zk.get('/mysql/pwd',watch=None))

'''
Reading Data
Methods:
    exists()
    get()           获取key对应的value值
    get_children()  获取节点下的key信息
'''
# Determine if a node exists
# if zk.exists("/redis/group1"):
#     # Do something
#     print(zk.get_children('/redis/group1'))
#     print(zk.get('/redis/group1/host'))

'''
Creating Nodes
Methods:
    ensure_path()   插入节点
    create()        创建节点key,value
'''
# zk.ensure_path('/mysql/test')
# zk.create('/mysql/test/node',b'node value')

'''
Updating Data
Methods:
    set()
'''
# zk.set('/mysql/test/node',b'value')
# print(zk.get('/mysql/test/node'))

'''
Deleting Nodes
Methods:
    delete()
'''
zk.delete('/mysql/test',recursive=True)

# In the end, stop it
zk.stop()

使用


from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start() //没有异常的话就意味着连接上zookeeper
children = zk.get_children('/')
zk.stop()

连接状态 zk.state

有三个值,LOST、CONNECTED、SUSPENDED image

zk.connected

已连接上返回True 没有连接上返回False

zk.get(path, watch=None)

获取指定节点的值,节点不存在触发NoNodeError异常

zk.get_children(path, watch=None)

获取指定节点子节点信息 image

web展示结果

image

后台逻辑

def base(basedir, id, pid):
    '''
    id:当前节点的id,ID的组成是 深度 + 兄弟节点序号 + 父节点ID
    pid: 父节点id
    name: 节点名称
    children: 子节点个数
    deep: 深度
    url: 节点的路径
    '''
    deep = ''
    try:
        if not zk.connected:
            zk.start()
        item = {}
        if basedir == '/':
            children = ['redis','game','uc']
        else:
            children = zk.get_children(basedir)
        cut = basedir.split('/')
        deep = len(cut)-1
        name = cut[deep]
        if len(children) == 0:
            childrenlen = 0
        else:
            childrenlen = len(children)
        item = {'id': id, 'pid': pid, 'name': name, 'children':childrenlen, 'deep':deep, 'url': basedir}
        datas.append(item)
        for i in xrange(len(children)):
            path = os.path.join(basedir, children[i])
            cut = path.split('/')
            n_id = str(len(cut)-1) + str(i) + str(id)
            base(path, n_id, id)
    except Exception, e:
        data = str(e)
    return datas
def get_children(req):
    status = 0
    message = ''
    data = []
    global datas
    datas = []
    basedir = '/'
    try:
        data = base(basedir, '10', '0')
    except Exception, e:
        status = -1
        message = str(e)
    zk.stop()
    result = stringify(status, message, data)
    return HttpResponse(result)

参考

参考资料: https://kazoo.readthedocs.org/en/latest/install.html http://sapser.github.io/python/zookeeper/2014/07/24/zookeeper-kazoo/

AlexZ33 commented 5 years ago

基本用法

连接处理 使用kazoo,首先需要实例化一个KazooClient对象并建立连接,代码如下

from kazoo.client import KazooClient

zk = KazooClient(hosts="127.0.0.1:2181")

zk.start()

默认情况下,KazooClient会连接本地zookeeper服务,端口号为2181。 当zookeeper服务异常时(服务异常或服务未启动等),zk.start()会不断尝试重新连接,直到连接超时。

连接一旦建立,无论是间歇性连接丢失(网络闪断等)或zookeeper会话过期,KazooClient会不断尝试重新连接。 我们可以通过stop命令显式的中断连接:


zk.stop()
AlexZ33 commented 5 years ago

会话状态

Kazoo的客户端在与zookeeper服务会话的过程中,通常会在以下三种状态之间相互切换:CONNECTED、 SUSPENDED、LOST。

当KazooClient实例第一次被创建时,它的状态为LOST,一旦连接建立成功,状态随即被切换为CONNECTED。

在整个会话的生命周期里,伴随着网络闪断、服务端zookeeper异常或是其他什么原因,导致客户端与服务端出现断开的情况,KazooClient的状态切换成SUSPENDED,与此同时,KazooClient会不断尝试重新连接服务端,一旦连接成功,状态再次回到CONNECTED。

AlexZ33 commented 5 years ago

kazoo状态监听

添加状态监听事件,实时监听客户端与服务端的会话状态,使得一旦发生连接中断、连接恢复或会话过期等情景时,我们能及时作出相应的处理。其使用方法如下:

def conection_listener (state) :
  if state == "Lost"
     # Register somewhere that the session was lost
     pass
  elif state == "SUSPENDED":
     # Handle being disconnected from Zookeeper
     pass
  else: 
     # Handle being connected/reconnected to Zookeeper
     pass
zk = KazooClient(hosts=“127.0.0.1:2181”)
zk.add_listenner(connection_listener)

当使用kazoo.recipe.lock.Lock或创建临时节点时,非常建议大家添加状态监听,以便我们的代码能够正确处理连接中断或Zookeeper会话丢失的情形。

AlexZ33 commented 5 years ago

zookeeper的增删改查

创建节点

读取数据
- exits(): 检查节点是否存在
- get(): 获取节点数据以及节点状态的详细信息
- get_children(): 获取指定节点的所有子节点
更新数据
- set():更新指定节点的信息。
删除节点
- delete():删除指定节点

# 监听器
kazoo可以在节点上添加监听,使得在节点或节点的子节点发生变化时进行触发。
kazoo支持两种类型的添加监听器的方式,一种是zookeeper原生支持的,其用法如下:

def test_watch_data(event): print("this is a watcher for node data.") zk.get_children("/china", watch=test_watch_children)


另一种方式是通过python修饰器的原理实现的,支持该功能的方法有:
- ChildrenWatch:当子节点发生变化时触发
- DataWatch:当节点数据发生变化时触发

@zk.ChildrenWatch("/china") def watch_china_children(children): print("this is watch_china_children %s" % children)

@zk.DataWatch("/china") def watch_china_node(data, state): print("china node is %s" % data)

AlexZ33 commented 5 years ago

kazoo事务

自v3.4以后,zookeeper支持一次发送多个命令,这些命令作为一个原子进行提交,要么全部执行成功,要么全部失败。

使用方法如下:

transaction = zk.transaction()
transaction.check('/china/hebei', version=3)
transaction.create('/china/shanxi', b"thi is shanxi.")
results = transaction.commit()