JXtreehouse / python_lessions

6 stars 3 forks source link

Python学习笔记: hmac算法 #13

Open AlexZ33 opened 4 years ago

AlexZ33 commented 4 years ago

https://www.liaoxuefeng.com/wiki/1016959663602400/1183198304823296

通过哈希算法,我们可以验证一段数据是否有效,方法就是对比该数据的哈希值,例如,判断用户口令是否正确,我们用保存在数据库中的password_md5对比计算md5(password)的结果,如果一致,用户输入的口令就是正确的。

为了防止黑客通过彩虹表根据哈希值反推原始口令,在计算哈希的时候,不能仅针对原始输入计算,需要增加一个salt来使得相同的输入也能得到不同的哈希,这样,大大增加了黑客破解的难度。

如果salt是我们自己随机生成的,通常我们计算MD5时采用md5(message + salt)。但实际上,把salt看做一个“口令”,加salt的哈希就是:计算一段message的哈希时,根据不通口令计算出不同的哈希。要验证哈希值,必须同时提供正确的口令。

这实际上就是Hmac算法:Keyed-Hashing for Message Authentication。它通过一个标准算法,在计算哈希的过程中,把key混入计算过程中。

和我们自定义的加salt算法不同,Hmac算法针对所有哈希算法都通用,无论是MD5还是SHA-1。采用Hmac替代我们自己的salt算法,可以使程序算法更标准化,也更安全。

Python自带的hmac模块实现了标准的Hmac算法。我们来看看如何使用hmac实现带key的哈希。

我们首先需要准备待计算的原始消息message,随机key,哈希算法,这里采用MD5,使用hmac的代码如下:

>>> import hmac
>>> message = b'Hello, world!'
>>> key = b'secret'
>>> h = hmac.new(key, message, digestmod='MD5')
>>> # 如果消息很长,可以多次调用h.update(msg)
>>> h.hexdigest()
'fa4ee7d173f2d97ee79022d1a7355bcf'

可见使用hmac和普通hash算法非常类似。hmac输出的长度和原始哈希算法的长度一致。需要注意传入的key和message都是bytes类型,str类型需要首先编码为bytes。

AlexZ33 commented 4 years ago

auth.py

import requests
import time
import datetime
import hmac
import hashlib
import base64
import json
from urllib.parse import urlparse

class Client():

    _STRING_TO_SIGN = "{method}\n{host}\n{uri}\n{timestamp}\n"
    _AUTH_HEADER = "APIKey={api_key},Signature={sig},Timestamp={timestamp}"

    def __init__(self, AK, SK):
        self.ak = AK
        self.sk = SK

    def _wrapper_header(self, url, headers):
        # get time
        TIMESTAMP = datetime.datetime.utcnow().isoformat() + "+00:00"

        parsed_url = urlparse(url)
        host = parsed_url.netloc
        uri = parsed_url.path
        # print("netloc:", parsed_url.netloc)
        string_to_sign = self._STRING_TO_SIGN.format(
            method="GET",
            host=parsed_url.netloc,
            uri=uri,
            timestamp=TIMESTAMP
        )

        headers_default = {
            "User-Agent": "default",
            "Content-Type": "application/json;charset=UTF-8"
        }

        for header in headers_default:
            if header in headers:
                headers_default[header] = headers[header]

        for key in sorted(headers_default):
            string_to_sign += headers_default[key] + "\n"

        # print(string_to_sign)
        key = bytes(self.sk, 'UTF-8')
        raw_sig = hmac.new(key, string_to_sign.encode("utf-8"), hashlib.sha256).digest()
        # raw_sig = bytes(raw_sig, 'UTF-8')
        encoded_sig = base64.urlsafe_b64encode(raw_sig)

        encoded_sig = str(encoded_sig, 'UTF-8')
        # print(encoded_sig)
        headers_default['Authorization'] = self._AUTH_HEADER.format(
            api_key = self.ak,
            sig = encoded_sig,
            timestamp = TIMESTAMP
        )

        return headers_default

    def request(self, method, url, headers={}, body=""):
        action_d = {
            "get": requests.get,
            "post": requests.post,
            "put": requests.put,
            "delete": requests.delete,
            "patch": requests.patch,
        }
        headers = self._wrapper_header(url, headers)
        if method == "get":
            resp = action_d["get"](url, headers=headers)
        else:
            if method not in action_d:
                return "This method is not support."
            resp = action_d["get"](url, headers=headers, json=body)
        return resp
AlexZ33 commented 4 years ago

test.py

from request_auth import Client
import requests

AK = ''
SK = ''

def main():
    client = Client(AK, SK)
    url = "url"
    headers = {
        "jx": "jx"
    }
    # request(self, method, url, headers={}, body="")
    resp = client.request('get', url, headers)
    print(resp.text)

if __name__ == '__main__':
    main()
AlexZ33 commented 4 years ago

auth.go

package auth

import (
     "fmt"
    "net/http"
    "io/ioutil"
    "strings"
    "bytes"
    "sort"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "time"
)

const (
    // common parameters
       // 公用参数
    authorizationHeader = "Authorization"
    apiKeyParam         = "APIKey"
    signatureParam      = "Signature"
    timestampParam      = "Timestamp"

    // parsing bits 需要转换为bytes类型的参数
    empty   = ""
    comma   = ","
    space   = " "
    eqSign  = "="
    newline = "\n"
)

type Wrapper struct {
    Ak string
    Sk string
}

func signString(str, ak, sk, timestamp string) string {
    hash := hmac.New(sha256.New, []byte(sk))
    hash.Write([]byte(str))
    signature := base64.URLEncoding.EncodeToString(hash.Sum(nil))
    return "APIKey=" + ak + ",Signature=" + signature + ",Timestamp=" + timestamp
}

func stringToSign(request *http.Request, timestamp string) string {
    var buffer bytes.Buffer

    uriList := strings.Split(request.URL.RequestURI(), "?")
    uriNoArgs := uriList[0]
    // Standard
    buffer.WriteString(request.Method)
    buffer.WriteString(newline)
    buffer.WriteString(request.Host)
    buffer.WriteString(newline)
    buffer.WriteString(uriNoArgs)
    // fmt.Println(request.URL.RequestURI())
    buffer.WriteString(newline)
    buffer.WriteString(timestamp)
    buffer.WriteString(newline)

    // new Headers
    headerOrig := map[string]string{
        "User-Agent": "default",
        "Content-Type": "application/json;charset=UTF-8",
    }
    for k, v := range headerOrig {
        v1 := request.Header.Get(k)
        if v1 == "" {
            headerOrig[k] = v
        }
    }
    keys := make([]string, 0, len(headerOrig))
    for k, v := range headerOrig {
        request.Header.Set(k, v)
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, k := range keys {
        header := headerOrig[k]
        buffer.WriteString(header)
        buffer.WriteString(newline)
    }

    return buffer.String()
}

func (w Wrapper) WrapRequest(request *http.Request) {
    currentTime := time.Now().UTC().String()
    timeArray := strings.Split(currentTime, space)
    timeHour := timeArray[1][0 : len(timeArray[1])-3]
    timestamp := timeArray[0] + "T" + timeHour + "+00:00"

    str := stringToSign(request, timestamp)
    authHeader := signString(str, w.Ak, w.Sk, timestamp)
    request.Header.Set("Authorization", authHeader)
}
AlexZ33 commented 4 years ago

main.go

package main

import (
    " auth"  // 上面代码github地址
    "net/http"
    "io/ioutil"
    "strings"
    "fmt"
)

const (
    ak = ""
    sk = ""
)

func main() {
    client := &http.Client{}

    url := "url"
    method := "GET"
    data := ""
    body := strings.NewReader(data)
    request, _ := http.NewRequest(method, url, body)

    wrapper := wrap.Wrapper{
        Ak: ak,
        Sk: sk,
    }
    wrapper.WrapRequest(request)

    response, _ := client.Do(request)
    defer response.Body.Close()
    respBody, _ := ioutil.ReadAll(response.Body)
    fmt.Printf("response data:%v\n",string(respBody))
}