deepflowio / deepflow

eBPF Observability - Distributed Tracing and Profiling
https://deepflow.io
Apache License 2.0
2.97k stars 333 forks source link

[BUG] wasm插件中请求三方接口失败 #8504

Open axnhhhh opened 9 hours ago

axnhhhh commented 9 hours ago

Search before asking

DeepFlow Component

Agent

What you expected to happen

wasm插件中请求三方接口”/detect/test?business=cto“,判断此请求是否为业务自定义异常场景,无法成功发送请求。agent报错:

image

wasm代码如下:

package main

import (
    "bufio"
    "bytes"
    "compress/gzip"
    "encoding/json"
    "fmt"
    "github.com/deepflowio/deepflow-wasm-go-sdk/sdk"
    "io"
    "net"
    "net/http"
    "net/url"
    "regexp"
    "strings"
)

const (
    BERTNER_BASE_HOST = "test.domain.com"
    BERTNER_BASE_PORT = "80"
    BERTNER_BASE_PATH = "/detect/test?business=cto"
)

var respPriorityMsgKeys = []string{"errMsg", "errorMessage", "msg", "message", "error_msg", "errmsg", "err", "error"}
var respOtherMsgKeys = []string{"status", "ActionStatus", "detail", "info", "reasonList"}

func main() {
    sdk.Info("on http status rewrite wasm plugin init")
    sdk.SetParser(parser{})
}

type parser struct {
    sdk.DefaultParser
}

func (p parser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action {
    return sdk.ActionNext()
}

func (p parser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action {
    payload, err := ctx.BaseCtx.GetPayload()
    if err != nil {
        return sdk.ActionAbortWithErr(err)
    }
    r, _ := http.ReadResponse(bufio.NewReader(bytes.NewReader(payload)), nil)
    if r == nil {
        return sdk.ActionAbort()
    }

    // 如果本身已经异常,则直接停止当前插件并且停止遍历
    if r.StatusCode >= 400 {
        return sdk.ActionAbort()
    }
    return onResp(r)
}

func (p parser) OnCheckPayload(ctx *sdk.ParseCtx) (protoNum uint8, protoStr string) {
    return 0, ""
}

func (p parser) OnParsePayload(ctx *sdk.ParseCtx) sdk.Action {
    return sdk.ActionNext()
}

func (p parser) HookIn() []sdk.HookBitmap {
    return []sdk.HookBitmap{
        sdk.HOOK_POINT_HTTP_RESP,
    }
}

/*
this demo use for convert and rewrite the response code according to the http response data in deepflow server.
deepflow server use the json key "OPT_STATUS" indicate the response status, "OPT_STATUS": "SUCCESS" is success,
otherwise assume fail and set the http status code to 500, the field map to deepflow as follows:

    response_code   -> http status code
    response_result -> if "OPT_STATUS": "SUCCESS" will leave it empty, otherwise will set to the whole http response body
    response_status -> http code in [200, 400) will act as Ok, [400, 500) will act as client error, [500,-) will act as server error
*/
func onResp(r *http.Response) sdk.Action {
    body, err := readBody(r)
    if len(body) == 0 || err != nil {
        sdk.Warn("Error reading body: %v", err)
        return normalResp(r)
    }

    var m map[string]interface{}
    if err := json.Unmarshal(body, &m); err != nil {
        sdk.Warn("Error unmarshalling resp JSON: %v", err)
        return normalResp(r)
    }

    respMessage := formatRespData(m)
    hasError, err := interfaceResponseIntention(respMessage)
    if !hasError || err != nil {
        return normalResp(r)
    }

    var (
        code = int32(500)
    )
    respStatus := getStatus(code)

    return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{
        {
            Resp: &sdk.Response{
                Status: &respStatus,
                Code:   &code,
                Result: string(body),
            },
        },
    })
}

func formatRespData(responseBody map[string]interface{}) string {
    allMsgKeys := append(respPriorityMsgKeys, respOtherMsgKeys...)
    extractValueFromKeys := func(data map[string]interface{}, keys []string) (string, bool) {
        for _, key := range keys {
            if value, exists := data[strings.ToLower(key)]; exists {
                if strVal, ok := value.(string); ok {
                    return strVal, true
                }
            }
        }
        return "", false
    }

    // 尝试从data中获取message
    if dataInterface, exists := responseBody["data"]; exists {
        if data, ok := dataInterface.(map[string]interface{}); ok {
            if message, found := extractValueFromKeys(data, allMsgKeys); found {
                return message
            }
        }
        // 如果data是字符串,直接跳过解析,从responseBody获取message
    }

    // 从responseBody获取message
    if message, found := extractValueFromKeys(responseBody, allMsgKeys); found {
        return message
    }

    return ""
}

/*
根据response body判断是否为异常请求
*/
func interfaceResponseIntention(message string) (bool, error) {
    formData := url.Values{}
    formData.Set("text", message)
    data := formData.Encode()

    // 创建 TCP 连接
    conn, err := net.Dial("tcp", net.JoinHostPort(BERTNER_BASE_HOST, BERTNER_BASE_PORT))
    if err != nil {
        sdk.Warn("Error connecting to bertner server: %v", err)
        return false, nil
    }
    defer conn.Close()

    // 发送 HTTP 请求
    request := buildPostRequest(BERTNER_BASE_HOST, BERTNER_BASE_PATH, data)
    _, err = conn.Write([]byte(request))
    if err != nil {
        sdk.Warn("Error sending request to bertner server: %v", err)
        return false, nil
    }

    // 读取响应
    response := readResponse(conn)
    if len(response) == 0 {
        return false, nil
    }

    // 提取响应体(去掉 HTTP 头部)
    body := extractResponseBody(response)
    if len(body) == 0 {
        return false, nil
    }
    sdk.Info("bertner server response body: %v", body)

    // 使用正则表达式提取 label
    label := extractLabel(body)
    if len(label) >= 6 && label[len(label)-6:] == "_error" {
        return true, nil
    }
    return false, nil
}

// 读取服务器响应
func readResponse(conn net.Conn) string {
    var response strings.Builder
    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        response.WriteString(scanner.Text() + "\n")
    }
    if err := scanner.Err(); err != nil {
        sdk.Warn("Error reading response: %v", err)
    }
    return response.String()
}

// 提取响应体
func extractResponseBody(response string) string {
    // 按双换行符分割,提取响应体部分
    parts := strings.SplitN(response, "\n\n", 2)
    if len(parts) < 2 {
        return ""
    }
    return strings.TrimSpace(parts[1]) // 去掉多余空白
}

// 使用正则表达式提取 label
func extractLabel(body string) string {
    // 匹配 "label" 的正则表达式
    re := regexp.MustCompile(`"label"\s*:\s*"([^"]+)"`)
    matches := re.FindStringSubmatch(body)
    if len(matches) > 1 {
        return matches[1] // 返回匹配到的 label 值
    }
    return ""
}

// 构造 POST 请求
func buildPostRequest(host, path, data string) string {
    var request bytes.Buffer
    request.WriteString(fmt.Sprintf("POST %s HTTP/1.1\r\n", path))
    request.WriteString(fmt.Sprintf("Host: %s\r\n", host))
    request.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
    request.WriteString(fmt.Sprintf("Content-Length: %d\r\n", len(data)))
    request.WriteString("Connection: close\r\n")
    request.WriteString("\r\n")
    request.WriteString(data)
    return request.String()
}

func readBody(r *http.Response) ([]byte, error) {
    var body []byte
    switch r.Header.Get("Content-Encoding") {
    case "gzip":
        g, err := gzip.NewReader(r.Body)
        if err != nil {
            sdk.Warn("failed to create gzip reader: %v", err)
            return body, err
        }

        body, _ = io.ReadAll(g)
        g.Close()
    default:
        body, _ = io.ReadAll(r.Body)
    }

    return body, nil
}

func normalResp(r *http.Response) sdk.Action {
    code := int32(r.StatusCode)
    status := getStatus(code)
    return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{
        {
            Resp: &sdk.Response{
                Status: &status,
                Code:   &code,
            },
        },
    })
}

/*
HTTP码在[200, 400)将被视为Ok,
在[400, 500)将被视为客户端错误,
[500,-)将被视为服务器错误
*/
func getStatus(statusCode int32) sdk.RespStatus {
    if statusCode >= 200 && statusCode < 400 {
        return sdk.RespStatusOk
    }
    if statusCode >= 400 && statusCode < 500 {
        return sdk.RespStatusClientErr
    }
    return sdk.RespStatusServerErr
}

How to reproduce

No response

DeepFlow version

agent: v6.5.9 server: v6.5.9

DeepFlow agent list

No response

Kubernetes CNI

No response

Operation-System/Kernel version

No response

Anything else

No response

Are you willing to submit a PR?

Code of Conduct

axnhhhh commented 9 hours ago

相同 tcp请求方式,本地测试是OK的,且网络无隔离

image