hodgesds / iouring-go

io_uring support for go
MIT License
202 stars 10 forks source link

Data corruption using WriteFixed #4

Open ImVexed opened 4 years ago

ImVexed commented 4 years ago

Using this code:

import (
    "unsafe"

    "github.com/hodgesds/iouring-go"
)

type netring struct {
    ring *iouring.Ring
}

func newNetRing() (*netring, error) {
    ring, err := iouring.New(1024)

    if err != nil {
        return nil, err
    }

    return &netring{
        ring,
    }, nil
}

func (n *netring) send(fds []int32, data []byte) error {
    addr := (uint64)(uintptr(unsafe.Pointer(&data[0])))
    length := uint32(len(data))

    for _, fd := range fds {
        e, commit := n.ring.SubmitEntry()

        e.Opcode = iouring.WriteFixed
        e.Fd = fd
        e.Addr = addr
        e.Len = length

        commit()
    }

    return n.ring.Enter(uint(len(fds)), uint(len(fds)), iouring.EnterGetEvents, nil)
}

I'm consistently getting data overwritten as it's sent to the client. I had a test that opens 1000 connections which all get the same WriteFixed submit entry, however upon looking at what they receive:

...
{"id":73,"roomId":1,"userId":876,"authorName":"Spam User 876","content":"...","createdAt":"2020-05-06T05:28:17.19695Z"}
{"id":73,"roomId":1,"userId":876,"authorName":"Spam User 876","content":"...","createdAt":"2020-05-06T05:28:17.19695Z"}
{"id":73,"roomId":1,"userId":876,"authorName":"Spam User 876","content":"...","createdAt":"2020-05-06T05:28:17.19695Z"}
{"id":73,"roomId":1,"userId":876,"authorName":"Spam User 876","content":"...{"id":73,"roomId":1,"userId":876,"authorName":"Spam User 876","content":"...

You'll see that we receive the message properly for some clients, but will occasionally receive a message like the last, where the end of the message is seemingly overwritten by the message.

I've ensured that there are no duplicate file descriptors causing 2 writes to be queued in the same ring for the same fd.

ImVexed commented 4 years ago

I can work on a repro for this if desired

hodgesds commented 4 years ago

I'll try taking a look at this tonight, I know there are probably some memory model violations that need to be investigated more and I'm almost certain that synchronization is not correct. At first glance using addr like that may violate the memory model of the runtime, I'd check the reference and if you get any issues with go vet we might have to come up with a clever solution to get around the runtime.

hodgesds commented 4 years ago

You can add some example code and put it in the examples folder so we can have some integration tests.

hodgesds commented 4 years ago

So the one thing I see with the example code is that it isn't advancing the head of the completion queue, see the EntryBy method.

ImVexed commented 4 years ago

I've made a few commits to my https://github.com/ImVexed/iouring-go/tree/buffers branch testing if RegisterBuffers fixes the memory violation, and it seems to.

I also seemed to have found a weird interaction between flushing the headers and immediately hijacking. Adding a delay in-between the two seems to fix it.

hodgesds commented 4 years ago

I wonder if there is a race in WriteHeaderNow, you probably need a mutex around fds = append(fds, int32(sf.Fd())) and after the send call in sendMessage the CQ head/tail need to be checked. Other than that the iovec registry stuff looks good.

ImVexed commented 4 years ago

I took out the WriteHeaderNow and just write a string literal after it's hijacked via:

if _, err := nc.Write([]byte("HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Type: text/event-stream\r\nTransfer-Encoding: chunked\r\n\r\n")); err != nil {
  log.Fatalln(err.Error())
}

And even mutex lock the entire handler but still get EOF's

Narrowing it down some more I seem to get the EOF's even on a Handler that only Hijacks and nothing more.

ImVexed commented 4 years ago

Huh, fixed.

Chaning HTTP/1.1 to HTTP/1.0 as I saw in examples in socket.io fixes the unexpected EOF. Regardless that's fixed.

Though that may be an issue underlyingly in the SSE client.

ImVexed commented 4 years ago

What specifically needs to be done to the sq/cq after enter is finished to support wrapping?

I've been playing with the Heads/Tails of both and can't seem to get it to properly wrap even with EntryBy

hodgesds commented 4 years ago

I've mad a few changes to EntryBy but basically what needs to be done is this: start from the cq Head&Mask and iterate until either the end of the Entries or the Tail&Mask is reached. I don't think the EntryBy is quite correct yet, but there is a wrap test. You don't need to update the Tail the kernel should set that. If you are doing real large number of reads/writes I'm not sure how the kernel will handle a wrap. There's a bit of extra info in this doc as well.

I've been working in conn.go to get a net listener partially working but will take a look at the EntryBy. The tricky part with EntryBy is making is safe for concurrent reads, handling acks (which it sort of does so in a suboptimal manner by setting a bit on the Flags), and handling wrap around.

ImVexed commented 4 years ago

Hmm, I can't validate against any of the tests since most tests on master are failing with Completion entry not found for me, however, even with updating the Cq head I'm deadlocking once it reaches capacity.

...
Waiting
Sq Head 800
Sq Tail 800
Sq Mask 1023
Sq Dropped 0
Cq Head 800
Cq Tail 800
Cq Mask 2047
Sent 64 bytes to 100 sockets in 585.723µs.
Waiting
Sq Head 900
Sq Tail 900
Sq Mask 1023
Sq Dropped 0
Cq Head 900
Cq Tail 900
Cq Mask 2047
Sent 64 bytes to 100 sockets in 593.123µs.
Waiting
Sq Head 1000
Sq Tail 1000
Sq Mask 1023
Sq Dropped 0
Cq Head 1000
Cq Tail 1000
Cq Mask 2047
Sent 64 bytes to 100 sockets in 787.831µs.

After it reaches 1k it will lock indefinitely. Should I be checking if the remaining space from Cq Tail to Size is big enough for the incoming bulk write? And if so, what should I do once it's not big enough?

hodgesds commented 4 years ago

I've fixed some of the tests on master and the TestRingReadWrap should be catching that case now (it wraps 4x), but it only does a single read at a time.

After it reaches 1k it will lock indefinitely. Should I be checking if the remaining space from Cq Tail to Size is big enough for the incoming bulk write? And if so, what should I do once it's not big enough?

Yeah, do a check and if the number of SQEs that are being added is too large then write until the end of the Sq and wrap the rest from the start. I might have fixed a bug in SubmitEntry recently that would make that not work right in all wrapping situations.

ImVexed commented 4 years ago

That seemed to fix it! However now it's locking up around 2x the limit:

Sent 64 bytes to 100 sockets in 683.526µs.
Waiting
Sq Head 1700
Sq Tail 676
Sq Mask 1023
Sq Dropped 0
Cq Head 1700
Cq Tail 1700
Cq Mask 2047
Sent 64 bytes to 100 sockets in 728.728µs.
Waiting
Sq Head 1800
Sq Tail 776
Sq Mask 1023
Sq Dropped 0
Cq Head 1800
Cq Tail 1800
Cq Mask 2047
Sent 64 bytes to 100 sockets in 787.631µs.
Waiting
Sq Head 1900
Sq Tail 876
Sq Mask 1023
Sq Dropped 0
Cq Head 1900
Cq Tail 1900
Cq Mask 2047
Sent 64 bytes to 100 sockets in 1.453356ms.
hodgesds commented 4 years ago

I would start by using 1 for the min complete when entering the ring with a small ring to test the wrapping action. I need to do some more work to get IORING_SETUP_IOPOLL handled properly in the SubmitEntry.

ImVexed commented 4 years ago

Hmm, could that potentially cause a scenario where Enter returns early with many SQE's not fully processed, and then another Write occurs which overrides the data in the shared buffer before the previous Enter is done using it?

hodgesds commented 4 years ago

From what I've read in the docs the kernel will copy it over async. This might be useful as well, but I'm not sure if you care about CQEs getting overwritten?

I'm still working on the design on EntryBy as well, right now it sort of works but I think it's technically a little racey and want to see if there is a better way.