networkservicemesh / sdk

Apache License 2.0
33 stars 36 forks source link

Create utilities to use iperf3 with NSM #1043

Open Bolodya1997 opened 3 years ago

Bolodya1997 commented 3 years ago

Overview

It can be helpful for a user to run iperf3 on existing kernel-...-kernel connection to get some network performance details for it. So what exactly we want:

  1. Endpoint is running iperf3 server on dst IP.
  2. Client is able to start iperf3 client connected to the dst IP with some parameters and get back results as a stream.
  3. In any case of healing with/without dst IP change Client is still able to make [2] after the healing completes.

Design

For [1] we are simply creating server chain element starting iperf3 server connected to dst IP on Request and stopping it on Close.

package iperf

type iperfServer {
    ctx context.Context
    runningServers Map // connID string -> cancel context.CancelFunc
}

func (s *iperfServer) Request(ctx, request) {
    conn := next.Request(ctx, request)
    if _, ok := runningServers.Load(connID); !ok {
        // we don't have kernel interface at the moment, so we will start server when it appears
        runningServers.Store(connID, startIperf3Server(conn.GetConnectionContext().GetIPContext().GetDstIP())
    }
    return conn
}

func (s *iperfServer) startIperf3Server(ip string) cancel {
    ctx, cancel := context.WithCancel(s.ctx)
    go func() {
        waitForIPAppears(ctx)
        bash.Run(ctx, fmt.Sprintf("iperf3 -s --bind %s", ip))
    }
    return cancel
}

func (s *iperfServer) Close(ctx, conn) {
    if cancel, ok := runningServers.LoadAndDelete(connID); ok {
        cancel()
    }
    return next.Close(ctx, conn)
}

For [2] we don't actually need a networkservice client, we need something like this:

package iperf

type Request struct {
    proto ProtoType
    bandwith int
    ...
}

type Response struct {
    err error
    message IperfData
}

type Client struct {
    ctx context.Context
    ipMap map[string]string // connID -> dst IP
}

func (c *Client) Request(connID string, request *Request) (<-chan *Response, context.CancelFunc) {
    ctx, cancel := context.WithCancel(c.ctx)
    respCh := make(chan *Response)
    go func() {
        var outCh, errCh chan string
        go func() {
            doResults(respCh, outCh, errCh)
        }
        bash.Run(ctx, fmt.Sprintf("iperf -c %s %s", c.ipMap[connID], requestToFlags(request)),
            bash.WithOut(outCh),
            bash.WithErr(errCh))
        close(respCh)
    }
    return cancel
}

But we need also to support [3], so we need to use monitor to receive dst IP updates and so we need a client chain element for it:

package iperf

type iperfClient struct {
    ctx context.Context
    client *Client
}

func (c *iperfClient) init(conn grpc.Conn) {
    monitorClient := monitor.MonitorConnections(s.ctx, conn)
    go func() {
        update := <-monitorClient
        for conn := range update.Conns {
            if dstIP := conn.GetConnectionContext().GetIPContext().GetDstIP(); dstIP != c.client.ipMap[conn.GetID()] {
                c.client.cancel(conn.GetID())
                c.client.ipMap[conn.GetID()] = dstIP
            }
        }
    }
}

func (c *iperfClient) Request(ctx, request) { ... }
func (c *iperfClient) Close(ctx, conn) { ... }
Bolodya1997 commented 3 years ago

@edwarnicke cc