vieyahn2017 / iBlog

44 stars 0 forks source link

10.26 python判断IP地址是否ping通 #370

Closed vieyahn2017 closed 3 months ago

vieyahn2017 commented 3 years ago

python判断IP地址是否ping通

vieyahn2017 commented 3 years ago
def ping_ip(ip):
    """
    ping不通:    4 packets transmitted, 0 received, 100% packet loss, time 2998ms
    ping得通:    4 packets transmitted, 4 received, 0% packet loss, time 2999ms
    """
    CMD = "ping {} -c 4".format(ip)
    process  = os.popen(CMD, 'r', 1)
    output = process.read()
    process.close()
    if "0 received" in output:
        print(output)
        print("connect to {} error.".format(ip))
        sys.exit()

ping_ip(HOST)
vieyahn2017 commented 3 years ago

def check_connect(ip, port):
    """ socket try connect """
    socket.setdefaulttimeout(4)
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    try:
        s.connect((ip, port))
        s.shutdown(2)
        # print( '\033[1;32m{} is OK.\033[0m'.format(ip))
        return False
    except socket.timeout:
        print( '\033[1;33m{} is down or network time out.\033[0m'.format(ip))
        return True
    except:
        print( '\033[1;31m{} is down!!!\033[0m'.format(ip))
        return True

if check_connect(HOST, PORT):
    sys.exit()
vieyahn2017 commented 3 years ago

使用 socket的代码抄自

用Python测试Ping主机IP和某端口是否开放 https://blog.csdn.net/ddrfan/article/details/88743854

若苗瞬 2019-03-22 16:12:41 2392 收藏 1 分类专栏: RHEL Python General Coding 文章标签: python 端口 ping linux 版权 文章目录 使用Python方法 命令和返回 pnp.py代码 使用命令方法 telnet wget SSH curl

vieyahn2017 commented 3 years ago

容器外获取ip的方法参考


def get_host_ip():
    # CMD = "docker exec -ti $(docker ps | grep xxx| awk 'NR==1{print $1}')  ifconfig eth3 | grep 'inet ' | awk 'NR==1{print $2}'"  # 直接取eth3也可以,但是不严谨
    CMD = "docker exec -ti $(docker ps | grep xxx| awk 'NR==1{print $1}')  netstat -ano | grep 18531 | awk '{print $4}'  | cut -d ':' -f1 "
    process  = os.popen(CMD, 'r', 1)
    output = process.read()
    process.close()
    listen_ip = output.strip()
    if re.match(r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$", listen_ip):
        return listen_ip
    else:
        print("get listen ip error.")
        sys.exit()

HOST = get_host_ip()
vieyahn2017 commented 3 years ago

def login():
    login_url = "https://{0}:{1}/loginInfo/login/v1.0".format(HOST, PORT)
    r_0 = requests.post(login_url,
                        json={"userName": USERNAME, "password": PASSWORD},
                        headers={"Content-Type": "application/json"},
                        verify=False)

    if r_0.status_code != 200:
        print("login failed %s" % r_0.status_code)
        return

    r_1 = r_0.json()
    if r_1["resultCode"] != 0:
        print("login failed: %s " % r_1)

    print("login success: %s " % r_1)

    # print(r_0.cookies)
    # <<class 'requests.cookies.RequestsCookieJar'>[<Cookie JSESSIONID=CA5B9A57BDB765BC56D308D29225A49087592968D0CE900FD75EEAA0482BCC64 for 3.181.2.12/>]>
    JSESSIONID = dict(r_0.cookies)["JSESSIONID"]
    cookies = {"JSESSIONID": JSESSIONID}
    headers = {"Content-Type": "application/json"}
    return {"headers": headers, "cookies": cookies}

def get_run(url, login_info):
    prefix = "https://{0}:{1}/".format(HOST, PORT)
    uri = prefix + url
    # print(uri)
    try:
        result = requests.session().get(uri, headers=login_info["headers"], cookies=login_info["cookies"], verify=False)
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("http get exception")
        return
    if result.status_code != 200:
        print(result.content)
        print("http error: %s" % (result.status_code))
    else:
        res = result.json()
        return res, uri, result.elapsed.total_seconds()
vieyahn2017 commented 3 years ago

change_user_pwd.py


# _*_ coding:utf-8 _*_
import os
import sys
import requests
import time
import datetime
import csv
import re
import json
import hashlib
# import base64
import getopt
import traceback
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

def get_ivdweb_ip():
    CMD = "docker exec -ti $(docker ps | grep viidgw | awk 'NR==1{print $1}')  ifconfig eth3 | grep 'inet ' | awk 'NR==1{print $2}'"
    # CMD = "docker exec -ti $(docker ps | grep viidgw | awk 'NR==1{print $1}')  cat /usr/local/iGET/config/cluster/clusterConfig.properties | grep 'clusterOuterIp' | awk -F '=' '{print $2}' "
    process  = os.popen(CMD, 'r', 1)
    output = process.read()
    process.close()
    ivdweb_ip = output.strip()
    if re.match(r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$", ivdweb_ip):
        return ivdweb_ip
    else:
        print("get ivdweb ip error.")
        sys.exit()

HOST = get_ivdweb_ip()
USERNAME = "super_admin"
PASSWORD_0 = "Change_Me"
PASSWORD = "Safe12#$"

TRACE_SWITCH = False

def headers_Authorization(headersstr, username, password):
    uri = "/VIID/System/Register"
    headerslist = re.findall(r'\"\S*\"', headersstr)
    realm = headerslist[0].strip('"').strip('"')
    qop = headerslist[1].strip('"').strip('"')
    nonce = headerslist[2].strip('"').strip('"')
    a1 = username + ":" + realm + ":" + password
    m1 = hashlib.md5()
    m1.update(a1)
    a2 = m1.hexdigest()
    m2 = hashlib.md5()
    m2.update("POST:" + uri)
    a3 = m2.hexdigest()
    a4 = a2 + ":" + nonce + ":::" + qop + ":" + a3
    m3 = hashlib.md5()
    m3.update(a4)
    a5 = m3.hexdigest()
    Authorization = "Digest " + "username=\"" + username + "\",realm=\"" + realm + "\",nonce=\"" + \
                    nonce + "\",uri=\"" + uri + "\",qop=" + \
                    qop + ",nc=" + ",cnonce=\"\"" + ",response=\"" + a5 + "\"" + ",opaque=\"\""
    return Authorization

def login_b003(username, password, deviceID):
    headers = {"Content-Type": "application/VIID+JSON", "user-identify": "1"}
    LOGIN_URL = "https://" + HOST + "/VIID/System/Register"
    r_0 = requests.post(LOGIN_URL,
                        json={"RegisterObject": {"DeviceID": deviceID}},
                        headers=headers,
                        verify=False)
    if r_0.status_code == 401:
        get_headers_str = r_0.headers["WWW-Authenticate"]
        headers["Authorization"] = headers_Authorization(get_headers_str, username, password)
    else:
        print("status_code  %s" % r_0.status_code)
    return headers

def get_headers(username, password):
    LOGIN_DEVICEID = "31000051511191256666"
    LOGIN_URL = "https://" + HOST + "/VIID/System/Register"
    first_login_rel = login_b003(username=username, password=password, deviceID=LOGIN_DEVICEID)
    r = requests.post(LOGIN_URL, headers=first_login_rel, json={"RegisterObject": {"DeviceID": LOGIN_DEVICEID}},
                      verify=False)
    if r.status_code == 200 or r.status_code == 202 or r.status_code == 201:
        first_login_rel.update({'viid-token': r.headers['viid-token']})
        return first_login_rel
    else:
        print(r.text)
        return r.status_code

def get_run(url, session, headers):
    URL = "https://" + HOST + "/VIID/" + url
    try:
        result = session.get(URL, headers=headers, verify=False)
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("get, no response")
        return
    if result.status_code != 200:
        print(result.content)
        print("http error: %s" % (result.status_code))
    else:
        return result.json()

def post_data(body, url, headers, session):
    URL = "https://" + HOST + "/VIID/" + url
    start_time = time.time()
    try:
        result = session.post(URL, json=body, headers=headers, verify=False)
        r_json = json.loads(result.text)
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("error, no response.")
        return
    try:
        status_code = int(r_json["StatusCode"])
    except Exception:
        status_code = int(r_json["ResponseStatusObject"]["StatusCode"])
    if TRACE_SWITCH:
        print(URL)
        print(body)
    if status_code != 0:
        print("http error: %s" % (result.status_code))
        print(result.content)
        print({'StartTime': start_time, 'ExecuteTime': result.elapsed.total_seconds(), 'Result': "Error %s" % result.status_code})
        return False
    else:
        print({'StartTime': start_time, 'ExecuteTime': result.elapsed.total_seconds(), 'Result': "OK %s" % result.json()})
        return True

def put_data(body, url, headers, session):
    URL = "https://" + HOST + "/VIID/" + url
    start_time = time.time()
    try:
        result = session.put(URL, json=body, headers=headers, verify=False)
        r_json = json.loads(result.text)
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("error, no response.")
        return
    try:
        status_code = int(r_json["StatusCode"])
    except Exception:
        status_code = int(r_json["ResponseStatusObject"]["StatusCode"])
    if TRACE_SWITCH:
        print(URL)
        print(body)
    if status_code != 0:
        print("http error: %s" % (result.status_code))
        print(result.content)
        print({'StartTime': start_time, 'ExecuteTime': result.elapsed.total_seconds(), 'Result': "Error %s" % result.status_code})
        return False
    else:
        print({'StartTime': start_time, 'ExecuteTime': result.elapsed.total_seconds(), 'Result': "OK %s" % result.json()})
        return True

def change_pwd(userName, verifyPassword, newPassword):
    # 先以该用户登录
    headers = get_headers(userName, verifyPassword)
    if TRACE_SWITCH:
        print(headers)
    post_session = requests.session()
    url = 'User/Member/Password/' + userName
    body = {"newPassword": newPassword, "verifyPassword": verifyPassword}
    put_data(body, url, headers, post_session)

if __name__ == "__main__":
    change_pwd(USERNAME, PASSWORD_0, PASSWORD)
vieyahn2017 commented 3 years ago

end

vieyahn2017 commented 3 years ago

python脚本监控ping回应值与丢包率

http://blog.chinaunix.net/uid-26978448-id-3411107.html

nagios-plugins自带有一个check_ping二进制文件可以检测丢包率和ping的回应值,但由于领导要求所有监控脚本改用python写,所以这里用python重新写了一个。代码如下:

#cat check_ping.py
点击(此处)折叠或打开

#!/usr/bin/env python
# coding: utf-8
# coding: cp950

'''
Create Date: 2012-11-06
Version: 1.0
Description: Detection host survival
Author: Victor
'''

''' Please run the script with root '''

import ping
import sys

def help():
  print '''
Usage:
%s
''' % (sys.argv[0])

try:
  result = ping.quiet_ping(sys.argv[1], timeout=2, count=10, psize=64)
  if int(result[0]) == 100:
    print 'Critical - 宕机, 丢包率:%s%% | 报警阀值: >= %s%% 或 >=%s ms' % (result[0], int(sys.argv[2]), int(sys.argv[3]))
    sys.exit(2)
  else:
    max_time = round(result[1], 2)
    if int(result[0]) < int(sys.argv[2]) and int(result[1]) < int(sys.argv[3]):
      print 'OK - 丢包率:%s%%, 最大响应时间:%s ms | 报警阀值: >= %s%% 或 >=%s ms' % (result[0], max_time, int(sys.argv[2]), int(sys.argv[3]))
      sys.exit(0)
    elif int(result[0]) >= int(sys.argv[2]) or int(result[1]) >= int(sys.argv[3]):
      print 'Warning - 丢包率:%s%%, 最大响应时间:%s ms | 报警阀值: >= %s%% 或 >=%s ms' % (result[0], max_time, int(sys.argv[2]), int(sys.argv[3]))
      sys.exit(1)
    else:
      print 'Unknown'
      sys.exit(3)
except IndexError:
  help()
  sys.exit(3)

该脚本使用了ping模块,ping模块默认没有安装,可使用easy_install进行安装,先在http://pypi.python.org/pypi/setuptools下载与python版本对应的setup-tools(用来提供easy_install命令),下载好后给执行权限,直接执行即可安装。接下来安装ping模块:

easy_install ping

安装成功后即可正常运行本脚本。 直接执行脚本不加参数可以查看脚本用法: python check_ping.py python check_ping.py www.baidu.com 50 200

50%表示丢包率阀值,即丢包率达到这个值后报警; 200ms表示最大响应时间,即ping某个地址的响应时间达到这个值后报警;