Closed chenws1012 closed 1 month ago
#!/usr/bin/python # -*- coding: utf-8 -*- """ 同步 nacos 服务 Version: 0.2 Author: WenShun Chen ref api:https://nacos.io/zh-cn/docs/open-api.html """ import requests # 查询服务列表 # 查询服务实例 并注册 到 另外的nacos def get_service_list(nacos_host, namespace_id): params = { "namespaceId": namespace_id, "pageNo": 1, "pageSize": 1000 } response = requests.get(nacos_host+'/nacos/v1/ns/service/list', params=params) data = response.json() return data["doms"] def get_service_instance(nacos_host, namespace_id, service_name): params = { "namespaceId": namespace_id, "serviceName": service_name } response = requests.get(nacos_host+'/nacos/v1/ns/instance/list', params=params) data = response.json() return data["hosts"] def register_service_instance(nacos_host, namespace_id, service_name, host, port): params = { "namespaceId": namespace_id, "serviceName": service_name, "ip": host, "port": port, "weight": 1, "enable": True, "healthy": True, "ephemeral": True, "metadata": '''{ "preserved.register.source": "nacos-sync-python" }''' } response = requests.post(nacos_host+'/nacos/v1/ns/instance', params=params) print('register status:' + response.text) if __name__ == '__main__': nacos_host_from = 'https://nacos-test-new.xxx.com' nacos_host_to = 'https://nacos-test.xxx.com' namespace_id = 'test' service_list = get_service_list(nacos_host_from, namespace_id) for server in service_list: print(server) # 查询服务实例 并注册 到 另外的nacos instance_list = get_service_instance(nacos_host_from, namespace_id, server) for instance in instance_list: register_service_instance(nacos_host_to, namespace_id, server, instance['ip'], instance['port']) print(instance)
It is not a issue