ethereum / utp

uTorrent transport protocol
MIT License
27 stars 13 forks source link

Full send_buf prevents sending of FIN #44

Open njgheorghita opened 1 year ago

njgheorghita commented 1 year ago

I've spent some time playing around with this library. Basically, I've been running a bare simple app locally that creates a sender & receiver utp socket and tries to tx 100000 bytes. This transfer has failed consistently - while it might be user error, i'm fairly confident I'm using the library in the way it's intended...

The repeated behavior I'm seeing is the sender / initiator processes the tx succesfully and closes the stream - however the recipient never fully processes the stream (via read_to_eof) and hangs until it time outs.

It appears to me as though a fin is never sent from the sender -> receipient. This is because the sender never clears its send_buf. The evaluations here and here always fail since send_buf is never emptied. So the fin is never sent. Something appears to be happening here where the send_buf is not properly cleared.

I still have some understanding to improve upon, so I'm not sure if these thoughts make any sense.

But from what I can tell from experimenting with this library, it's consistent behavior that's preventing the sending of the fin packet. Mostly just leaving this here so that I can try and organize my thoughts & i'll pick it back up after the weekend, but if anybody has any clues / ideas / pointers on what I'm misunderstanding - that's definitely appreciated.

jacobkaufmann commented 1 year ago

can you share calling code to reproduce?

njgheorghita commented 1 year ago

Yup, I basically just created a new cargo project and this is the main.rs


#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::TRACE)
        .init();
    let args: Vec<String> = env::args().collect();
    match args[1].as_str() {
        "receiver" => receiver().await,
        "sender" => sender().await,
        _ => panic!("invalid argument"),
    }
}

async fn sender() {
    // bind a standard UDP socket. (transport is over a `tokio::net::UdpSocket`.)
    let socket_addr = SocketAddr::from(([127, 0, 0, 1], 3400));
    let udp_based_socket = UtpSocket::bind(socket_addr).await.unwrap();
    tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;

    // connect to a remote peer over uTP.
    let remote = SocketAddr::from(([127, 0, 0, 1], 3401));
    let config = ConnectionConfig::default();
    let mut stream = udp_based_socket.connect(remote, config).await.unwrap();

    // write data to the remote peer over the stream.
    let data = vec![0xef; 100_000];
    let n = stream.write(data.as_slice()).await.unwrap();
}

async fn receiver() {
    // bind a standard UDP socket. (transport is over a `tokio::net::UdpSocket`.)
    let socket_addr = SocketAddr::from(([127, 0, 0, 1], 3401));
    let udp_based_socket = UtpSocket::bind(socket_addr).await.unwrap();

    // accept a connection from a remote peer.
    let config = ConnectionConfig::default();
    let mut stream = udp_based_socket.accept(config).await.unwrap();

    // read data from the remote peer until the peer indicates there is no data left to write.
    let mut data = vec![];
    let n = stream.read_to_eof(&mut data).await.unwrap();
}

I run the experiment by running the sender via cargo run -- sender in one window and then run cargo run -- receiver in another window immediately after

jacobkaufmann commented 1 year ago

huh, well when I substitute 100k here, things still appear to work correctly. one thing I can think of is whether your code is running with the multi-threaded tokio runtime. could you verify that you have the rt-multi-thread flag active?

to answer your question about the send buffer, the internal representation is a VecDeque<Vec<u8>> so that bytes from different, consecutive writes are not sent in the same DATA packet. there is some justification here. the internal representation could be different though. it's more of an implementation detail.

njgheorghita commented 1 year ago

:+1: on send buffer

Yeah, I also substituted 100k into that test and everything worked. I was running with rt-multi-thread active and experienced the same behavior after changing [tokio::main] -> #[tokio::main(flavor = "multi_thread", worker_threads = 10)].

In #45 I explained some weird behavior in my terminal that I observed. And since in that pr we have a single test to verify against, which is easier to debug, I'll revisit this issue once we resolve whatever's going on in that pr.

emhane commented 1 year ago

I believe your fix to this test shows that utp manages to transfer 100k bytes without problem, right? @njgheorghita

KolbyML commented 1 year ago

There is a stall bug I found in the library which prevents all write if a large enough vector is submitted. Here is a PR which resolves it https://github.com/ethereum/utp/pull/56

emhane commented 1 year ago

another curious thing with this idle_timeout: the first test passes and the second test fails, at first sight I can't figure out why. I tired setting the idle timeout to a minute, still didn't pass the second test and still due to an 'idle time out' error.

njgheorghita commented 1 year ago

@emhane IIUC correctly, rust runs all tests concurrently.... this could be the source of the issue? If so I do think that would confirm the flakiness of this library w/ handling concurrent txs. In trin we use the serial test library in situations where want to control the order in which tests are run. It'd be interesting to know whether the second test still fails if you run them serially

emhane commented 1 year ago

@emhane IIUC correctly, rust runs all tests concurrently.... this could be the source of the issue? If so I do think that would confirm the flakiness of this library w/ handling concurrent txs. In trin we use the serial test library in situations where want to control the order in which tests are run. It'd be interesting to know whether the second test still fails if you run them serially

I am running them separately, clone the branch and you tell me what's up? @njgheorghita

emhane commented 1 year ago

I believe the read

another curious thing with this idle_timeout: the first test passes and the second test fails, at first sight I can't figure out why. I tired setting the idle timeout to a minute, still didn't pass the second test and still due to an 'idle time out' error.

The read loop blocks.

njgheorghita commented 1 year ago

I am running them separately, clone the branch and you tell me what's up?

Idk.. cloned it and saw the same behavior you're describing. However... in test 2... if I tokio::spawn(..) the let tx = tx_stream.write(data.as_slice()); as a new thread, then the tests pass.

This could be evidence that "The read loop blocks."...

Furthermore, I've only experienced bugs when trin uses utp to read streams. (eg. trin -> fluffy txs work flawlessly). Could be further evidence that the bug resides in the reading...

No coherent thoughts here, just thinking out loud

KolbyML commented 1 year ago

I am running suit image 10k 10k from your chart nick

This is my test branch (nothing is modified except adding a few debug logs and the test suit) https://github.com/KolbyML/utp/commit/861e314c04a3b924b83754860d88c887c003b9c4

State::Established { sent_packets, .. } | State::Closing { sent_packets, .. } => { notice here in the code.

The issue appears to be an incorrect handling of overflow

The import part to look for is how CircularRangeInclusive { start: 14871, end: 14850, exhausted: false } changes then becomes matching and then we get packet err=InvalidAckNum. You have to scroll right to see this in the log

2023-05-25T20:27:46.656381Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14827 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.658154Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14828 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.660831Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14829 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.662667Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14830 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.665624Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14831 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.667766Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14832 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14850, exhausted: false }
2023-05-25T20:27:46.669714Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14833 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.671460Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14834 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.673338Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14835 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.675125Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14836 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.676836Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14837 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.678562Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14838 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.680393Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14839 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14861, exhausted: false }
2023-05-25T20:27:46.682953Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx2 14840 :: 60899 :: CircularRangeInclusive { start: 14871, end: 14871, exhausted: false }
2023-05-25T20:27:46.682996Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: timeout fsdxx3 14840
2023-05-25T20:27:46.683010Z  WARN uTP{send=2466 recv=2465}: utp_rs::conn: resetting connection: received ACK for unsent packet err=InvalidAckNum

Should send 1 megabyte: Kind(NotConnected)
thread 'stream::test::test_transfer_1_megabyte' panicked at 'Should send 1 megabyte: Kind(NotConnected)', src/stream.rs:163:22

The error NotConnected doesn't matter (that is just caused by how I am testing, if I run the test suit with 100mb the same issue occurs it gets a timeout error though. So I can replicate this issues and get both depending how I try to create it (this is because the errors assume a bug free library).

No matter what I do the issue always occurs after the overflow which definitly signifies there is a logic issue somewhere.

The logic of how this ties in with our other discoverys

This makes sense since the bug always happened with long concurrent transfers. Seq_nr/act_nr are supposed to overflow but this will only happen after we send 65536 packets which happens under long sustained loads

KolbyML commented 1 year ago

Here is a demo of the overflow bug 65535 passes and 65536 fails https://github.com/ethereum/utp/pull/60

jacobkaufmann commented 1 year ago

I will echo a comment I left on #56: the send buffer capacity should be configurable to account for the maximum payload length that you could conceivably transfer in your program. this is why I asked nick in #45 about the size of the payloads. also mentioned there, the send buffer currently has a fixed capacity. that capacity should be made configurable. any attempted writes beyond that capacity will fail due to the issue described in #56.

emhane commented 1 year ago

I believe the root of the problem is that most of the code is sync, i.e. blocking, and called kind of recursively sometimes like process_writes calling process_writes again through the indirection.

What happens now is that the window gets to 0 when running two streams on the same machine and sending more than 1,1 MB before the other stream can progress (the first stream is always Poll::Ready to make progress so to say as it is sync but still waits on its 'remote' counterpart on the same machine to send ACKs), and my understanding so far of the congestion control algorithm means that if the sender gets no ACKs, the window is eventually decreased to zero which then stops transmission of new packets?

If the sender and the receiver are on different machines, the program won't go into a deadlock for unidirectional streams so long as packets aren't lost too often, i.e. as long as the sender gets the ACKs so it can finish writing and move on with the next thing. The first fix may hence be to make sure a failed connection is handled properly so that another connection is established within the same stream if the connection fails like in an unreliable network condition, so without losing the stream state. To allow for concurrent streams to work smoothly, each stream can be ran in a blocking thread, that means that all the streams can make progress all the way till the end concurrently. Otherwise we get some weird idle time outs as each stream will write/read till its done before another can proceed, so some unlucky streams when running many streams concurrently in the same program, will always wait too long for the other streams to finish a write or a read and will time out.

Another way, as most functions called from, for example, process_writes take a &mut self, we can't run each process_writes call in a blocking thread, but we could make a connection per event dealt with in the loop of a running Connection. So we would then have at least two connections per stream, one for reads and one for writes, and each of those connections run in a separate blocking thread. This makes the most sense possibly as it is from Connection that most of the sync code is called. This would also allow for bidirectional streams not to interfere with each other, i.e. process_writes won't have to finish before giving process_reads a chance to start, eliminating the risk that process_reads times out if more than 1,1 MB is sent as it waits for process_writes to finish synchronously (or possibly the limit is less than 1,1 MB if other streams are running concurrently to this one).

Possibly we want to try some of these approaches out before making the whole code called from Connection async, which is naturally always an option. Then, I suggesting making reads and writes a stream by implementing the Stream trait on some new structs for this or on SendBuffer and ReceiveBuffer so reads and writes can progress partly concurrently in async context.

njgheorghita commented 1 year ago

This seems quite promising! Nice digging!

I just ran some local tests and observed that the bug in #55 occurs when the window is 0, both when it panics & before as the timeout is doubling to the max value of 134217728s.

The first fix may hence be to make sure a failed connection is handled properly so that another connection is established within the same stream if the connection fails

I was under the impression that a Stream and a Connection have more of a 1-1 relationship rather than 1-many. At least in trin, we start & shutdown a stream for each utp tx. Maybe I'm wrong? I'm not sure if the complexity of handling multiple connections inside a single stream is justified, as opposed to simply terminating the stream if the connection fails. Although, maybe the overhead of opening a new stream justifies this complexity?

@emhane Given the observed fact that trin-> fluffy utp txs work flawlessly. How would you reconcile that with this hypothesis? It would appear to me as though the blocking (or at least the problematic blocking) occurs on the receiver side, in some way preventing them from sending ACKs.

Curious to hear @jacobkaufmann 's thoughts on this

jacobkaufmann commented 1 year ago

it will take some more time for me to go through emilia's analysis, but I can confirm there is a 1:1 relationship between stream and connection. the connection is essentially the state machine, and the stream is the interface to the underlying connection.

emhane commented 1 year ago

@njgheorghita yes precisely, 1-1 rn stream-connection, I didn't state otherwise but opened for exploration of 1-*. this is my understanding of the relationship between components. that fluffy-trin works but not trin-trin is definitely interesting, how much data are you sending in those tests?

great, @jacobkaufmann, I'm sure with all our heads together we can send 1 GB within weeks.

emhane commented 1 year ago

this is also quite curious and also pointing to the direction that there is something blocking with the receiving: when I pass the expected eof to a read, the second test that failed for us @njgheorghita works...

KolbyML commented 1 year ago

great, @jacobkaufmann, I'm sure with all our heads together we can send 1 GB within weeks.

What do you mean by send 1 GB send 1 GB during a connection or send 1 GB as a total to multiple connections

emhane commented 1 year ago

@KolbyML on one stream, so that would mean chunking any larger data into chunks of 1 MB and opening a new connection for each chunk, serially I guess, but sending them all in parallel from one stream would be cool - then the data would arrive in no time 😎

as for why the first test works and the second not, a stream must be manually shut down or dropped for FIN to be sent and a write and read to be successful.

KolbyML commented 1 year ago

@KolbyML on one stream, so that would mean chunking any larger data into chunks of 1 MB and opening a new connection for each chunk, serially I guess, but sending them all in parallel from one stream would be cool - then the data would arrive in no time 😎

😎 We can do that after we resolve the halt/deadlock bug I found and after we fix the overflow bug I found (which I am working on having a PR to fix both)

I see you proposed a solution for the halt/deadlock bug, I left a comment with a issue I think your solution has.

But yeah it should be doable to send 1 GB soon after the overflow bug is fixed. I think we should avoid making new connections serially, and just fix the bug, but yeah!

Parallel connections probably wouldn't make transfers faster and would add complexity

  1. if the socket isn't constrain by congestion it will already be fast, if it is it will make it slower
  2. adding orchestration for making multiple streams to the same user for 1 data flow wouldn't that be a lot of complexity?
emhane commented 1 year ago

@KolbyML on one stream, so that would mean chunking any larger data into chunks of 1 MB and opening a new connection for each chunk, serially I guess, but sending them all in parallel from one stream would be cool - then the data would arrive in no time 😎

😎 We can do that after we resolve the halt/deadlock bug I found and after we fix the overflow bug I found (which I am working on having a PR to fix both)

I found all the same bugs on my end, naturally, because they are the bugs blocking sending more data.

I see you proposed a solution for the halt/deadlock bug, I left a comment with an issue I think your solution has. thanks for that, I fixed it.

But yeah it should be doable to send 1 GB soon after the overflow bug is fixed. I think we should avoid making new connections serially, and just fix the bug, but yeah!

sure, I like your attitude. let's squash 'em all.

Parallel connections probably wouldn't make transfers faster and would add complexity

  1. if the socket isn't constrain by congestion it will already be fast, if it is it will make it slower

it would speed it up the transfer for example when other data is transferred in one other window too, as then the transfer would be allowed 2/3 separate windows instead of 1/2 windows. but this is not where we start you're absolutely right. but if all connections do that, try to double, triple, etc. their windows, then there is no gain you're quite right.

  1. adding orchestration for making multiple streams to the same user for 1 data flow wouldn't that be a lot of complexity?

a little bit of complexity but nothing new. like the congestion control model used again for multiple streams more or less, to sort the streams at destination. however it would require all clients interpret turn data in a special way, which is a protocol change, those take time to get consensus on, so nvm for this case.

62, how will that buffer increase the amount available to transfer? does it mask the seq numbers or smthg like that so we get more values from a u16? I couldn't find the reference implementation.

KolbyML commented 1 year ago

I found all the same bugs on my end, naturally, because they are the bugs blocking sending more data.

Of course people using the same buggy library would experience the same bugs

I couldn't find the reference implementation.

These are considered the references implementaions https://github.com/bittorrent/libutp https://github.com/arvidn/libtorrent/blob/RC_2_0/src/utp_stream.cpp

If you want something simpler to read but highly similar to the reference implementations read fluffies implementation https://github.com/status-im/nim-eth/blob/master/eth/utp/utp_socket.nim

the current problem, I think diagrams will help explain this :sunglasses:

https://github.com/ethereum/utp/pull/62, how will that buffer increase the amount available to transfer? does it mask the seq numbers or smthg like that so we get more values from a u16?

I think to understand this it will help for me to explain how this implementation and the reference is different

how ethereum/utp works

This implementation works by starting a vector with the first data packet we sent and adding every outgoing packet to this vector till we have store 2^16 sent packets. We can check if the packet we sent was acted because we will store it in every packet struct as data (we shouldn't do this). We can also check if the ack_num the reciever sent is correct by checking if it is within the range init_seq_num..=init_seq_num + outgoing_buffer.len(). (which is also bad since this relies on us knowing how many packets we sent in total, in TCP we can send inf packets (uTP should and does do this too in working implementations)). The outcome of this process is we end up with a outgoing buffer of 2^16 packets 98% of which we no longer need to know about since the rx confirmed they have it. Nevertheless we use this knowledge to calculate all the data we need, but there are ways to calculate this information without knowing the info of every single packet we sent.

I will now do a demo image of how ethereum/utp works Imagine this is our u16 sequence number index and we happened to start at 0 (this is random normally) (our outgoing buffer will be -> and grow as packets are added)

0-----1-----2-----3-----4-----5-----6-----7-----8-----9-----10-----11-----12-----13-----14-----15-----16
->

Our outgoing_buffer (sent_packets.packets in the code) will start here^

image At this point we have sent 2^16 data packets if we send 1 more we will exceed u16.

Problems?

  1. we stored all the packets we no longer need since rx confirmed he got them, this consumes lots of memory
  2. since we store all packets we heavily rely and encapsulate data which shouldn't be kept in there data structs
  3. we check if an ack_num is valid by checking if it is in the range init_seq_num..=(init_seq_num + outgoing_buffer.len()) which is bad since we shouldn't know how many packets we have sent in total.
  4. when we have sent 2^16 data packets, the packets we stored in the outgoing_buffer will have overlapping sequence numbers (index numbers) if we send anymore
  5. since we already have bad habbits of keeping all sent packets and knowing the inital_sequence_number this allows us to write code which relys on more bad habits digging ourselves a deeper hole

how reference uTP implementations work and TCP works

how will that buffer increase the amount available to transfer? does it mask the seq numbers or smthg like that so we get more values from a u16?

We don't need more values since our outgoing buffer only has to track the cur_send_window worth of packets, since if a packet is confirmed recieved we no longer need to track it.

Instead of growing a list of all knowledge ever sent. The reference implementations uses a sliding window which is a window of packets not confirmed to be recieved yet. This is all the information we need. Let say there are currently 2k sent but unacked data packets at all times.

Imagine this is our u16 sequence number index and we happened to start at 0 (this is random normally) (<-----> will be our sliding window of unacked packets we have to track in case we have to resend the data)

0-----1-----2-----3-----4-----5-----6-----7-----8-----9-----10-----11-----12-----13-----14-----15-----16
<----->

Our sliding window will start here^ image This is a picture representing our sliding window (which is the unacked packets in the outgoing_buffer. only unacked packets are stored in the outgoing buffer)

The picture above is us sending 2^16 + a few data packets, but as we could imagine this could go on forever (this is how TCP/uTP works)

Final Notes

I hope I explained things well if not please ask more questions :). The sliding window is what I want to implement this will allow us to send infinity data just like a TCP connection would. I hope this answers what #62 is trying to achieve (of course it will be a multi part PR). Increasing the size of seq_num isn't the solution, but allowing us to use a circulating window is the answer!