vieyahn2017 / shellv

shell command test and study
4 stars 1 forks source link

11.14 使用curl命令登录网站并获取cookie,并将json数据上传 #75

Open vieyahn2017 opened 3 years ago

vieyahn2017 commented 3 years ago

使用curl命令登录网站并获取cookie,并将json数据上传

curl -k -X POST -c cookie.txt --header 'Content-Type: application/json' --header 'Accept: application/json' -d 
        {"username"="admin","password"="123"} https://www.XXX.com

-k的意思是指访问前缀是https的情况下不对服务器证书SSL进行检查访问,如果前缀是http 可以直接curl http://www.xxx.com

-X POST同理有-X PUT,-X DELETE,-X GET等,-X POST是指使用post请求给服务器传递数据。我们将用户名和密码通过curl命令post到指定网站,然后通过 -c cookie.txt将获取的cookie写到cookie.txt的文件里。

curl -kv -X PUT --header 'Content-Type:application/json' --header 'Accept: application/json ' -d {#这里是所传的json值} -b cookie.txt https://www.xxx.com

通过-b cookie.txt获取cookie然后访问网站,传输json数据。

vieyahn2017 commented 3 years ago

curl -k -X POST -c cookie.txt --header 'Content-Type: application/json' --header 'Accept: application/json' -d {"username"="admin","password"="123"} https://www.XXX.com

-k的意思是指访问前缀是https的情况下不对服务器证书SSL进行检查访问,如果前缀是http 可以直接curl http://www.xxx.com

-X POST同理有-X PUT,-X DELETE,-X GET等,-X POST是指使用post请求给服务器传递数据。我们将用户名和密码通过curl命令post到指定网站,然后通过 -c cookie.txt将获取的cookie写到cookie.txt的文件里。

curl -kv -X PUT --header 'Content-Type:application/json' --header 'Accept: application/json ' -d {#这里是所传的json值} -b cookie.txt https://www.xxx.com

通过-b cookie.txt获取cookie然后访问网站,传输json数据。

直接以参数传递cookie

curl -k -X GET --cookie 'JSESSIONID=41228ABC03DA2C432D412E09DEBD250691B7F2EABA959013103BF6922CD0E0E7' -H "Content-type: application/json" https://3.25.68.10:18531/raidInfo/v1?nodeCode=0b8fa67d817d40c991bad0be7d84c5d6

vieyahn2017 commented 3 years ago

CURL基于cookie的自动登录脚本

weixin_33971130 2012-01-11 09:21:00 145 收藏 文章标签: shell 放在crontab中自动执行登录,有时候挺有用的。

#!/bin/bash

# cookie 临时文件
COOKIEFILE=cookie_tmp.txt

# 登录
curl $LOGINURL -d "username=$USERNAME&password=$PASSWORD&autologin=1" -c $COOKIEFILE -s

RESPONSE=`curl $UPDATEURL -s -b $COOKIEFILE | grep $USERNAME`
TIME=` date + '%Y-%m-%d %H:%M:%S' `
echo "$TIME | $RESPONSE" >> $LOGFILE

完整注释和代码见 http://github.com/panweizeng/home/blob/master/code/shell/curl_login

vieyahn2017 commented 3 years ago

性能测试11.05 仅登录

for ((i=1; i<=100; i++))
do
    echo $i
    sleep 1
    curl -k -X POST -H "Content-type: application/json"  https://$(netstat -ano | grep 1831 | awk 'NR==1{print $4}')/login/v1.0 -d ' {"userName": "admin",    "password": "123456" } '  && echo && echo
done

性能测试11.14 use cookie

mkdir -p /test_cookie
for ((i=1; i<=100; i++))
do
    echo $i
    sleep 1
    cookie_file=/test_cookie/cookie_${i}.txt
    echo "login:"
    curl -k -c ${cookie_file} -X POST -H "Content-type: application/json"  https://$(netstat -ano | grep 1531 | awk 'NR==1{print $4}')/login/v1.0 -d ' {"userName": "admin",    "password": "123456" } '  && echo 
    echo "test:"
    curl -k -b ${cookie_file} -X POST -H "Content-type: application/json"  https://$(netstat -ano | grep 1531 | awk 'NR==1{print $4}')/video/v1.0 -d ' {"cameraCode": "x#x", "mediaURLParam": {"broadCastType": 0, "packProtocolType": 1, "protocolType": 2, "serviceType": 1, "streamType": 1, "transMode": 0}} '  && echo && echo
done
vieyahn2017 commented 3 years ago

性能测试 python


# _*_ coding:utf-8 _*_
import os
import sys
import socket
import requests
import time
import datetime
import re
import json
import csv
import getopt
import traceback
import threading
import multiprocessing
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# env info #
HOST=""  # 最新:用shell命令获取
PORT=1531
USERNAME="admin"
PASSWORD="123456"

# 调试开关,输出详细的异常信息
TRACE_SWITCH = False

############################
# 性能测试并发参数

# 每个进程执行次数
CONCURRENT_TIMES = 10
# 并发进程数量
CONCURRENT_NUM = 10
# 每条进程中的并行线程数量
CON_THREAD_COUNT = 5

############################

def get_host_ip():
    # CMD = "docker exec -ti $(docker ps | grep vcnapi | awk 'NR==1{print $1}')  ifconfig eth3 | grep 'inet ' | awk 'NR==1{print $2}'"  # 直接取eth3也可以,但是不严谨
    CMD = "docker exec -ti $(docker ps | grep vcnapi | awk 'NR==1{print $1}')  netstat -ano | grep 18531 | awk '{print $4}'  | cut -d ':' -f1 "
    process  = os.popen(CMD, 'r', 1)
    output = process.read()
    process.close()
    listen_ip = output.strip()
    if re.match(r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$", listen_ip):
        return listen_ip
    else:
        print("get listen ip error.")
        sys.exit()

HOST = get_host_ip()

def ping_ip(ip):
    """
    ping不通:    4 packets transmitted, 0 received, 100% packet loss, time 2998ms
    ping得通:    4 packets transmitted, 4 received, 0% packet loss, time 2999ms
    """
    CMD = "ping {} -c 4".format(ip)
    process  = os.popen(CMD, 'r', 1)
    output = process.read()
    process.close()
    if "0 received" in output:
        print(output)
        print("connect to {} error.".format(ip))
        sys.exit()

# ping_ip(HOST)

def check_connect(ip, port):
    """ socket try connect """
    socket.setdefaulttimeout(4)
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    try:
        s.connect((ip, port))
        s.shutdown(2)
        # print( '\033[1;32m{} is OK.\033[0m'.format(ip))
        return False
    except socket.timeout:
        print( '\033[1;33m{} is down or network time out.\033[0m'.format(ip))
        return True
    except:
        print( '\033[1;31m{} is down!!!\033[0m'.format(ip))
        return True

if check_connect(HOST, PORT):
    sys.exit()

############################

def login():
    login_url = "https://{0}:{1}/loginInfo/login/v1.0".format(HOST, PORT)
    r_0 = requests.post(login_url,
                        json={"userName": USERNAME, "password": PASSWORD},
                        headers={"Content-Type": "application/json"},
                        verify=False)

    if r_0.status_code != 200:
        print("login failed %s" % r_0.status_code)
        return

    r_1 = r_0.json()
    if r_1["resultCode"] != 0:
        print("login failed: %s " % r_1)

    print("login success: %s " % r_1)

    # print(r_0.cookies)
    # <<class 'requests.cookies.RequestsCookieJar'>[<Cookie JSESSIONID=CA5B9A57BDB765BC56D308D29225A49087592968D0CE900FD75EEAA0482BCC64 for 3.181.2.12/>]>
    JSESSIONID = dict(r_0.cookies)["JSESSIONID"]
    cookies = {"JSESSIONID": JSESSIONID}
    headers = {"Content-Type": "application/json"}
    return {"headers": headers, "cookies": cookies}

def get_run(url, login_info):
    prefix = "https://{0}:{1}/".format(HOST, PORT)
    uri = prefix + url
    # print(uri)
    try:
        result = requests.session().get(uri, headers=login_info["headers"], cookies=login_info["cookies"], verify=False)
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("http get exception")
        return
    if result.status_code != 200:
        print(result.content)
        print("http error: %s" % (result.status_code))
    else:
        res = result.json()
        return res, uri, result.elapsed.total_seconds()

def get_product_info(login_info, is_show_result=True):
    res, uri, _ = get_run("productInfo/v1", login_info)
    if is_show_result:
        print("\n" + uri)
        print(res)

def get_domain_list(login_info, is_show_result=True):
    res, uri, _ = get_run("domainList/v1", login_info)
    if is_show_result:
        print("\n" + uri)
        print(res)
    try:
        nodes = res["domainList"]["domain"][0]["nodes"]["node"]
        for node in nodes:
            if int(node.get("serviceType")) == 25:
                return node["nodeCode"]
        print("query Safevideo nodeCode(serviceType=25) failed")
        sys.exit()
    except Exception:
        if TRACE_SWITCH:
            traceback.print_exc()
        print("query Safevideo nodeCode(serviceType=25) error")
        sys.exit()

# 记录日志文件
def write_result(file_obj, data):
    header = []
    if isinstance(data, dict):
        header = data.keys()
    if header != []:
        writer = csv.DictWriter(file_obj, fieldnames=header)
        writer.writerow(data)

def time_wrapper(func):
    """
    最开始准备用time_wrapper来计算执行时间
    后来发现requests返回结果带着执行时间 result.elapsed.total_seconds()
    """
    def wrapper(*args, **kwargs):
        start_time = time.time()
        func(*args, **kwargs)
        over_time = time.time()
        total_time = over_time - start_time
        print("{} run {} s".format(func.__name__, total_time))
    return wrapper

#@time_wrapper
def get_disk_info(login_info, nodeCode, result_obj_file=None, is_show_result=False):
    res, uri, execute_time = get_run("diskInfo/v1?nodeCode=" + nodeCode, login_info)

    if result_obj_file:
        write_result(result_obj_file, {'URL': uri, 
                                       'StartTime': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                                       'ExecuteTime': execute_time,
                                       'Result': res})
    if is_show_result:
        print("\n" + uri)
        print(res)

#@time_wrapper
def get_raid_info(login_info, nodeCode, result_obj_file=None, is_show_result=False):
    res, uri, execute_time = get_run("raidInfo/v1?nodeCode=" + nodeCode, login_info)

    if result_obj_file:
        write_result(result_obj_file, {'URL': uri, 
                                       'StartTime': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())),
                                       'ExecuteTime': execute_time,
                                       'Result': res})
    if is_show_result:
        print("\n" + uri)
        print(res)

def base_test():
    login_info = login()
    get_product_info(login_info)
    nodeCode = get_domain_list(login_info)
    # print(nodeCode)
    get_disk_info(login_info, nodeCode, is_show_result=True)
    get_raid_info(login_info, nodeCode, is_show_result=True)

# # 每个进程执行次数
# CONCURRENT_TIMES = 10
# # 并发进程数量
# CONCURRENT_NUM = 10
# # 每条进程中的并行线程数量
# CON_THREAD_COUNT = 5

# 接口调用次数 理论上为 CONCURRENT_TIMES * CONCURRENT_NUM
# 但是需保证CONCURRENT_TIMES是CON_THREAD_COUNT的整数倍,否则添加数量会比预想的多一点
# 最终,多线程模拟添加的数量准确结果是 CONCURRENT_NUM * (CONCURRENT_TIMES + CONCURRENT_TIMES % CON_THREAD_COUNT)
# 比如,CONCURRENT_TIMES=5 CONCURRENT_NUM=3 CON_THREAD_COUNT=2 添加的不是15条,而是18条 ( 3*(5+1)=18 )
# 再如,CONCURRENT_TIMES=9 CONCURRENT_NUM=4 CON_THREAD_COUNT=3 添加的是36条 (4*9=36)

# times单个进程里的总请求次数,thd_cnt每条进程中的并行线程数量
def performance_run(times, thd_cnt, func, login_info, nodeCode, flag="diskInfo"):
    i = 0
    result_obj_file = open("ResultLog/test_result_{}.csv".format(flag), 'ab')
    while 1:
        thd_lst = []
        for cu in range(thd_cnt):
            thd = threading.Thread(target=func, args=(login_info, nodeCode, result_obj_file))
            i += 1
            thd_lst.append(thd)
        for thd in thd_lst:
            thd.start()
        for thd in thd_lst:
            thd.join()
        if i >= times:
            result_obj_file.close()
            break

def performance_main(func, login_info, nodeCode, flag="diskInfo"):
    if not os.path.exists("ResultLog"):
        os.makedirs("ResultLog")
    # 移除原有记录
    if os.path.exists("ResultLog/test_result_{}.csv".format(flag)):
        os.renames("ResultLog/test_result_{}.csv".format(flag), "ResultLog/test_result_{}__{}.csv".format(flag, int(time.time())))

    thread_num = CONCURRENT_NUM    # 并发进程数量
    times = CONCURRENT_TIMES       # 每个进程执行次数
    thd_cnt = CON_THREAD_COUNT     # 每条进程中的并行线程数量

    pool = multiprocessing.Pool(processes=thread_num)
    for i in range(thread_num):
        pool.apply_async(performance_run,
                         (times, thd_cnt, func, login_info, nodeCode, flag))
    pool.close()
    pool.join()

    # result_obj_file = open("ResultLog/test_result.csv", 'ab')
    # result_obj_file.close()
    # 不能在并发任务下发这里打开/关闭文件,会报错I/O operation on closed file

def performance_test():
    login_info = login()
    nodeCode = get_domain_list(login_info, is_show_result=False)
    # nodeCode="51ae2150a1fa4226ae35a810d4bbe872"
    # print(nodeCode)
    performance_main(get_disk_info, login_info, nodeCode, flag="diskInfo")
    print("test result: test_result_diskInfo.csv")
    performance_main(get_raid_info, login_info, nodeCode, flag="raidInfo")
    print("test result: test_result_raidInfo.csv")

if __name__ == "__main__":
    if len(sys.argv) == 1:
        # 接口基本测试
        base_test()
    else:
        # 性能测试
        performance_test()