hodgesds / iouring-go

io_uring support for go
MIT License
202 stars 10 forks source link

client how use ring object? #7

Open awzhgw opened 4 years ago

awzhgw commented 4 years ago

package main

import (
    "flag"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net"
    "github.com/hodgesds/iouring-go"
    "net/http"
    _ "net/http/pprof"
    "os"
    "path"
    "sync/atomic"
    "time"
)

var (
    size=flag.Int64("size",64,"default write size")
    rootDir=flag.String("root","/data0","default write dir")
    uring=flag.Bool("iouring",true,"is used iouring")
    role=flag.String("role","server","default role")
    addr1=flag.String("remote","127.0.0.1:8888","default remote addr")
    port=8888
    ring *iouring.Ring
)

func inita(){
    var err error
    if *uring{
        ring, err = iouring.New(
            8192,
            &iouring.Params{
                Features: iouring.FeatNoDrop,
            },
            iouring.WithID(100000),
        )
        if err!=nil{
            log.Fatalf("init failed %v",err)
        }
        iouring.FastOpenAllowed()
    }
}

func init() {
    rand.Seed(time.Now().UnixNano())
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func RandStringRunes(n int) string {
    b := make([]rune, n)
    for i := range b {
        b[i] = letterRunes[rand.Intn(len(letterRunes))]
    }
    return string(b)
}

func main() {
    flag.Parse()

    inita()
    if *role=="server"{
        go func() {
            e := http.ListenAndServe(fmt.Sprintf(":%v", 8822), nil)
            if e != nil {
                log.Println(fmt.Errorf("cannot listen pprof %v err %v", 8822, e))
                os.Exit(1)
            }
        }()
        l,err:=Listen()
        if err!=nil {
            log.Fatal(err)
        }
        for {
            conn,err:=l.Accept()
            if err!=nil {
                log.Println(fmt.Sprintf("Accept error %v",err))
                continue
            }
            go Write(conn)
        }
    }else {
        go func() {
            e := http.ListenAndServe(fmt.Sprintf(":%v", 8821), nil)
            if e != nil {
                log.Println(fmt.Errorf("cannot listen pprof %v err %v", 8822, e))
                os.Exit(1)
            }
        }()
        conn,err:=Connect(*addr1)
        if err!=nil {
            log.Fatalf("connect error %v",err)
        }
        data:=RandStringRunes(int(*size))
        writeData:=([]byte)(data)
        for {
            _,err=conn.Write(writeData)
            if err!=nil{
                log.Println(fmt.Sprintf("write error %v",err))
                return
            }
        }
    }

}

func Listen()( l net.Listener,err error){
    if *uring{
        fmt.Printf("listening on port: %d\n", port)
        l, err := ring.SockoptListener(
            "tcp",
            fmt.Sprintf(":%d", port),
            func(err error) {
                log.Println(err)
            },
            iouring.SOReuseport,
        )
        if err != nil {
            log.Fatal(err)
        }
        return l,err
    }else {
        l,err=net.Listen("tcp",fmt.Sprintf(":%d", port))
        return l,err
    }
}

func Connect(addr string)(conn net.Conn,err error){
    conn,err=net.DialTimeout("tcp",addr,time.Second)
    if err!=nil {
        log.Println(fmt.Sprintf("Dail to %v err %v",addr,err))
        return
    }
    conn.(*net.TCPConn).SetNoDelay(true)
    conn.(*net.TCPConn).SetLinger(0)
    conn.(*net.TCPConn).SetKeepAlive(true)

    return
}

func Write(conn net.Conn){
    dst, err := os.OpenFile(path.Join(*rootDir,"1.txt"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
    if err != nil {
        log.Fatal(err)
    }

    data:=make([]byte,*size)
    var cnt uint64
    go func() {
        ticker:=time.NewTicker(time.Second)
        for {
            select {
                case <-ticker.C:
                    log.Println(fmt.Sprintf("iops is %v",atomic.LoadUint64(&cnt)))
                    atomic.StoreUint64(&cnt,0)
            }
        }
    }()

    if *uring{
        r, err := ring.FileReadWriter(dst)
        if err!=nil {
            log.Fatal(err)
        }
        for {
            _,err=io.ReadFull(conn,data)
            if err!=nil {
                log.Fatalf("read from conn error %v",err)
            }
            fmt.Println(fmt.Sprintf("recive data %v",string(data)))
            _,err=r.Write(data)
            if err!=nil {
                log.Fatalf("write error %v",err)
            }
            atomic.AddUint64(&cnt,1)
        }
    }else {
        for {
            _,err=io.ReadFull(conn,data)
            if err!=nil {
                log.Fatalf("read from conn error %v",err)
            }
            _,err=dst.Write(data)
            if err!=nil {
                log.Fatalf("write error %v",err)
            }
            atomic.AddUint64(&cnt,1)
        }
    }
    dst.Close()
}

this is my test code ....but i find the server can only recive 2 packet,then cannot recive any packet?

why????
hodgesds commented 4 years ago

The server implementation is a little buggy currently. What needs to happen is after after every operation on the socket a repoll io_uring request has to be made to re-enable polling on the socket (you can see the code here). There's a few edge cases with the current socket handling and I'm pretty sure if some things are broken, especially when using clients with keep alive connections. I've noticed some of the same errors in my testing as well, but haven't had time to look into them more. I've been trying to get a good API for doing batches of operations that work in a single Enter call, which is what will really give performance compared to the Read/Write operations that exist today (you can see the extra overhead in the benchmarks).

I'm on holiday this weekend, but this looks like a good test case so I'll continue to test with it as I have time. There's also WithDebug that can be passed into the ring as well (although much of that debugging needs to be worked on).

awzhgw commented 4 years ago

@hodgesds I am a R&D engineer of distributed storage. I am very interested in your project. Our project github.com/chubaofs/chubaofs has now entered the cncf foundation. Our project is written in go language. Your iouring library, perfect it and contribute to us? Or join us to make a world-class storage project?

hodgesds commented 4 years ago

Chubaofs looks really interesting! I have to check with my employer to double check licensing/contribution policies but it seems like this library could be useful.

I've just gotten around to getting an initial version of multiple writer (fan out) support, but the benchmarks seem to show some work still needs to be done. The same goes for the batching with the WithDeadline ring option, which also has a benchmark.

I think this project has a lot of potential, but there are quite a few things that need to be worked on to get it production ready. I think I will be looking for contributors and also try to work on making sure that users of the project are up to date with what the current state of the project is.

awzhgw commented 4 years ago

@hodgesds
I mean: after you can make your iouring-go code base, our chubaofs can be used directly