sile / wstcp

WebSocket to TCP proxy written in Rust
MIT License
51 stars 11 forks source link

Proxy appears to lock up at high message rates #4

Open unicolet opened 3 years ago

unicolet commented 3 years ago

@sile we seem to have run into an issue with wstcp where it stops forwarding messages when it receives websocket messages at a "high" rate. We have pinpointed the rate at about 90ms between each message. Sending messages without waiting more than 90ms from the previous one will reliably lock up wstcp.

We have looked at everything we could and it seems the problem is with wstcp. We tried replacing wstcp with another ws-to-tcp proxy and the problem disappeared, confirming our suspects.

A packet trace shows that the wstcp receives the message, but never passes it on to the TCP server on the other side. We will gather more info to help with the resolution of this issue and comment here as soon as we have news.

unicolet commented 3 years ago

If it helps, the same does not happen in the other direction (tcp -> websocket)

sile commented 3 years ago

Sorry for the delayed response and thank you for reporting this issue. I'll try to reproduce this problem (it would be very helpful if you could share a toy program or commands to reproduce that).

unicolet commented 3 years ago

@sile I will try, and no need to apologize :-)

unicolet commented 3 years ago

I unsuccessfully tried reproducing the problem with the following setup. I believe it's not sending fast enough, but nodejs will not let me go lower than 1ms for setInterval. I think in our case we were sending between 0.5 and 0.1 ms.

built from master with latest rust after a rustup update, then ran as:

./target/debug/wstcp 127.0.0.1:9999

opened a nc -l 9999 to receive the tcp data, then tried to send messages as fast as possible with:

const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:13892');
var buffer = Buffer.from("1234567890123456", "utf-8")
ws.on('open', function open() {
        ws.on('message', function incoming(message) {
          console.log('received: %s', message);
        });
        loop=0
        setInterval(function(){ ws.send(buffer); loop++ }, 1);
});

result: wstcp works just fine

I'll keep trying

unicolet commented 3 years ago

tried with a go client that can go lower than ms, such as this one, and nc -l 9999 running locally:

// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
    "flag"
    "fmt"
    "log"
    "net/url"
    "os"
    "os/signal"
    "time"

    "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:13892", "http service address")

func main() {
    flag.Parse()
    log.SetFlags(0)

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
    log.Printf("connecting to %s", u.String())

    c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
    if err != nil {
        log.Fatal("dial:", err)
    }
    defer c.Close()

    done := make(chan struct{})

    go func() {
        defer close(done)
        for {
            _, message, err := c.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                return
            }
            log.Printf("recv: %s", message)
        }
    }()

    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-done:
            fmt.Println("done")
            return
        case t := <-ticker.C:
            fmt.Println("t")
            err := c.WriteMessage(
                websocket.BinaryMessage,
                []byte(t.String()+"\n"))
            if err != nil {
                log.Println("write:", err)
                return
            }
        case <-interrupt:
            log.Println("interrupt")

            // Cleanly close the connection by sending a close message and then
            // waiting (with timeout) for the server to close the connection.
            err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
            if err != nil {
                log.Println("write close:", err)
                return
            }
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}

but still can't reproduce the issue. The data we send is binary, if that helps :-(

unicolet commented 3 years ago

we're trying to debug the issue ourselves :crossed_fingers:

sile commented 3 years ago

Thanks!

unicolet commented 3 years ago

so we've spent some time on it, but we're as clueless as before. All I could offer is some straces, which we can't make much out of. These are the ending bits which I suppose should show where the issue is:

[pid 1736043] 13:52:55.784792 sched_yield() = 0
[pid 1736043] 13:52:55.784843 sched_yield() = 0
[pid 1736043] 13:52:55.784894 sched_yield() = 0
[pid 1736043] 13:52:55.785007 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=10000}, 0x7fb2c02167c0) = 0
[pid 1736043] 13:52:55.785149 futex(0x564f39ef509c, FUTEX_WAIT_PRIVATE, 0, NULL <unfinished ...>
[pid 1734559] 13:52:55.794556 <... epoll_wait resumed>[{EPOLLIN|EPOLLOUT, {u32=2, u64=2}}], 1000, -1) = 1
[pid 1734559] 13:52:55.794764 futex(0x564f39ef5098, FUTEX_WAKE_PRIVATE, 1) = 1
[pid 1736050] 13:52:55.794810 <... futex resumed>) = 0
[pid 1736050] 13:52:55.794849 futex(0x564f39ef5040, FUTEX_WAKE_PRIVATE, 1 <unfinished ...>
[pid 1734559] 13:52:55.794864 epoll_wait(5,  <unfinished ...>
[pid 1736050] 13:52:55.794959 <... futex resumed>) = 0
[pid 1736050] 13:52:55.795015 recvfrom(4, "\202\217\2145\222a\3315\226a\2155\223a\211%\243W\232\2558", 4096, 0, NULL, NULL) = 21
[pid 1736050] 13:52:55.795091 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795120 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795170 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795194 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795273 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795311 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795351 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795387 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736050] 13:52:55.795427 sendto(8, "U\0\4\0\1\0\1\0\5\02016\26\230\252", 15, MSG_NOSIGNAL, NULL, 0) = 15
[pid 1736050] 13:52:55.795544 sched_yield() = 0
[pid 1736050] 13:52:55.795588 sched_yield() = 0
[pid 1736050] 13:52:55.795616 sched_yield() = 0
[pid 1736050] 13:52:55.795662 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=10000}, 0x7fb2bb3f87c0) = 0
[pid 1736050] 13:52:55.795788 futex(0x564f39ef509c, FUTEX_WAIT_PRIVATE, 0, NULL <unfinished ...>
[pid 1734559] 13:52:55.804975 <... epoll_wait resumed>[{EPOLLIN|EPOLLOUT, {u32=2, u64=2}}], 1000, -1) = 1
[pid 1734559] 13:52:55.805128 futex(0x564f39ef5098, FUTEX_WAKE_PRIVATE, 1 <unfinished ...>
[pid 1736048] 13:52:55.805193 <... futex resumed>) = 0
[pid 1734559] 13:52:55.805200 <... futex resumed>) = 1
[pid 1736048] 13:52:55.805207 futex(0x564f39ef5040, FUTEX_WAIT_PRIVATE, 2, NULL <unfinished ...>
[pid 1734559] 13:52:55.805216 futex(0x564f39ef5040, FUTEX_WAKE_PRIVATE, 1 <unfinished ...>
[pid 1736048] 13:52:55.805224 <... futex resumed>) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1734559] 13:52:55.805251 <... futex resumed>) = 0
[pid 1736048] 13:52:55.805261 futex(0x564f39ef5040, FUTEX_WAKE_PRIVATE, 1 <unfinished ...>
[pid 1734559] 13:52:55.805286 epoll_wait(5,  <unfinished ...>
[pid 1736048] 13:52:55.805299 <... futex resumed>) = 0
[pid 1736048] 13:52:55.805333 recvfrom(4, "\202\217\r\331Y#X\331]#\f\331X#\10\311h\24\32C\363", 4096, 0, NULL, NULL) = 21
[pid 1736048] 13:52:55.805426 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805454 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805493 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805521 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805592 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805631 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805686 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805718 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736048] 13:52:55.805746 sendto(8, "U\0\4\0\1\0\1\0\5\02017\27\232\252", 15, MSG_NOSIGNAL, NULL, 0) = 15
[pid 1736048] 13:52:55.805834 sched_yield() = 0
[pid 1736048] 13:52:55.805879 sched_yield() = 0
[pid 1736048] 13:52:55.805908 sched_yield() = 0
[pid 1736048] 13:52:55.805949 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=10000}, 0x7fb2bb7fa7c0) = 0
[pid 1736048] 13:52:55.806075 futex(0x564f39ef509c, FUTEX_WAIT_PRIVATE, 0, NULL <unfinished ...>
[pid 1734559] 13:52:55.815478 <... epoll_wait resumed>[{EPOLLIN|EPOLLOUT, {u32=2, u64=2}}], 1000, -1) = 1
[pid 1734559] 13:52:55.815575 futex(0x564f39ef5098, FUTEX_WAKE_PRIVATE, 1) = 1
[pid 1736044] 13:52:55.815757 <... futex resumed>) = 0
[pid 1734559] 13:52:55.815792 epoll_wait(5,  <unfinished ...>
[pid 1736044] 13:52:55.815803 futex(0x564f39ef5040, FUTEX_WAKE_PRIVATE, 1) = 0
[pid 1736044] 13:52:55.815879 recvfrom(4, "\202\217\325\344Yq\200\344]q\324\344Xq\320\364hI\315x\363", 4096, 0, NULL, NULL) = 21
[pid 1736044] 13:52:55.816019 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816056 recvfrom(4, 0x564f39ecfca5, 4075, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816115 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816146 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816262 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816288 recvfrom(4, 0x564f39ecfc90, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816324 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816363 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736044] 13:52:55.816398 sendto(8, "U\0\4\0\1\0\1\0\5\02018\30\234\252", 15, MSG_NOSIGNAL, NULL, 0) = 15
[pid 1736044] 13:52:55.816546 sched_yield() = 0
[pid 1736044] 13:52:55.816578 sched_yield() = 0
[pid 1736044] 13:52:55.816655 sched_yield() = 0
[pid 1736044] 13:52:55.816694 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=10000}, 0x7fb2bbffe7c0) = 0
[pid 1736044] 13:52:55.816810 futex(0x564f39ef509c, FUTEX_WAIT_PRIVATE, 0, NULL <unfinished ...>
[pid 1734559] 13:52:55.825883 <... epoll_wait resumed>[{EPOLLIN|EPOLLOUT, {u32=2, u64=2}}], 1000, -1) = 1
[pid 1734559] 13:52:55.825980 futex(0x564f39ef5098, FUTEX_WAKE_PRIVATE, 1) = 1
[pid 1736045] 13:52:55.826020 <... futex resumed>) = 0
[pid 1736045] 13:52:55.826110 futex(0x564f39ef5040, FUTEX_WAKE_PRIVATE, 1 <unfinished ...>
[pid 1734559] 13:52:55.826126 epoll_wait(5,  <unfinished ...>
[pid 1736045] 13:52:55.826137 <... futex resumed>) = 0
[pid 1734559] 13:52:55.826161 <... epoll_wait resumed>[{EPOLLIN|EPOLLOUT, {u32=2, u64=2}}], 1000, -1) = 1
[pid 1736045] 13:52:55.826247 recvfrom(4,  <unfinished ...>
[pid 1734559] 13:52:55.826262 epoll_wait(5,  <unfinished ...>
[pid 1736045] 13:52:55.826358 <... recvfrom resumed>"\202\217lB\357\3449B\353\344mB\356\344iR\336\335u\334E\202\2212\271\10Rg\271\16R3"..., 4096, 0, NULL, NULL) = 44
[pid 1736045] 13:52:55.826433 recvfrom(4, 0x564f39ecfcbc, 4052, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826473 recvfrom(4, 0x564f39ecfcbc, 4052, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826531 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826558 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826652 recvfrom(4, 0x564f39ecfcbc, 4052, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826676 recvfrom(4, 0x564f39ecfcbc, 4052, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826732 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826770 recvfrom(8, 0x564f39ef01e0, 4096, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 1736045] 13:52:55.826807 sendto(8, "U\0\4\0\1\0\1\0\5\02019\31\236\252", 15, MSG_NOSIGNAL, NULL, 0) = 15
[pid 1736045] 13:52:55.826916 sched_yield() = 0
[pid 1736045] 13:52:55.826987 sched_yield() = 0
[pid 1736045] 13:52:55.827051 sched_yield() = 0
[pid 1736045] 13:52:55.827083 clock_nanosleep(CLOCK_REALTIME, 0, {tv_sec=0, tv_nsec=10000}, 0x7fb2bbdfd7c0) = 0
[pid 1736045] 13:52:55.827198 futex(0x564f39ef509c, FUTEX_WAIT_PRIVATE, 0, NULL
unicolet commented 3 years ago

resuming some work here, I tried building wstcp from 0.1.5, before #2 and it still shows the issue. Interestingly I cannot reproduce the issue with another websocket-to-tcp proxy such as efrecon/websockify (link: https://hub.docker.com/r/efrecon/websockify)

unicolet commented 3 years ago

@sile I've done some debugging and I think I have found something interesting, finally. I added some print statements until I came across this subtle difference when wstcp causes the issue (message dropped):

in https://github.com/sile/wstcp/blob/master/src/channel.rs#L198:L198 I've noticed that, for the message that gets lost, it seems to be returned together with the previous message in a single read in https://github.com/sile/wstcp/blob/master/src/channel.rs#L216 . I see a print out where FrameDecoder payload is 38 bytes, which hints at 2 messages being lumped together: one message is 15 bytes, the other should be 2 bytes longer, or 17 bytes. I can't explain the extra (23-17=6) bytes, but perhaps it's some indication this is a compound message or the header of the next ws message?

Then only the first 15 bytes are sent to the TCP destination and the remaining bytes seem to have been lost.

Note that we send the messages separately from the client, so this aggregation behaviour seems to be entirely not intentional from our side.

Could it be that wstcp message processing logic is not dealing with such case correctly?

Printout of the problematic message:

handle_ws_stream
handle_ws_stream frame_encoder.is_idle
handle_ws_stream encode_to_write_buf
handle_ws_stream decode_from_read_buf
decode buf[38]=[15, 48, 28, 179, 91, 48, 25, 179, 95, 32, 41, 138, 67, 174, 178, 130, 145, 42, 117, 42, 39, 127, 117, 44, 39, 43, 117, 43, 39, 47, 101, 89, 83, 69, 5, 33, 201, 128]
decode self.buf=[]
returning 15
handle_ws_stream ok
write_decoded_data self.buf[15]=[85, 0, 4, 0, 1, 0, 1, 0, 5, 16, 49, 57, 25, 158, 170]
wrote only 15
handle_ws_stream
handle_ws_stream frame_encoder.is_idle
handle_ws_stream encode_to_write_buf
handle_ws_stream decode_from_read_buf
finish_decoding self.buf=[]
handle_ws_stream ok
Jul 26 13:31:57.798 DEBG Received frame: Data, relay_addr: 172.27.1.166:35126, client_addr: 127.0.0.1:37850, server_addr: 10.10.2.188:60100, proxy_addr: 0.0.0.0:13892

you can see the println statements in https://github.com/sile/wstcp/compare/master...unicolet:issue_4?expand=1

unicolet commented 3 years ago

AFAIU 6 bytes could very well be a websocket header, so the 38 bytes could be 2 ws messages lumped together

unicolet commented 3 years ago

definitely looks like wstcp does not process the remaining bytes in ws_rbuf. The bytes are not processed because on the subsequent loop this condition (payload.header.is_none()) is false:

https://github.com/unicolet/wstcp/blob/issue_4/src/frame.rs#L415

so wstcp tries (again) to parse a payload, but fails. The header is not none because it's the result from the previous invocation which has still not been flushed out by calling finish_decoding. @sile this is very much beyond my skills.

Reproducing

I suppose the go program above cannot reproduce the issue because it adds a small delay between each message. It looks like the client used in our tests skips the delay for the last message, however I'm not 100% sure that is the cause.

sile commented 3 years ago

Thanks for your detailed investigation! I'll take a look at the detail and try to fix that this weekend.

unicolet commented 3 years ago

@sile any news? thanks in advance :-)

sile commented 3 years ago

@unicolet Sorry, there is no progress as I didn't have enough time to address this problem 😢 Let me notify you when the status is updated.

unicolet commented 3 years ago

@sile that's understandable :) thanks!

sile commented 3 years ago

Hi @unicolet , sorry for the too delayed response.

I had some time to read the code (and the issue comments above) and try to reproduce the issue but it failed ... It feels too difficult to solve this issue without a program to reproduce it (or a full packet dump data sent to wstcp).

The header is not none because it's the result from the previous invocation which has still not been flushed out by calling finish_decoding.

I still couldn't understand why this situation occurred. Seeing the debug log (https://github.com/sile/wstcp/issues/4#issuecomment-886626161), FramePayloadDecoder::finish_decoding() seems called (the log message "finish_decoding self.buf=[]" indicates that). The finish_decoding method sets the FramePayloadDecoder::header field to None by calling self.header.take() at the two lines below the debug print. So the header field should always be None after the successful finish_decoding call. Please let me know if I misunderstand something.

unicolet commented 3 years ago

@sile no problem :-)

I can send a packet dump to your email account :+1: In the meantime we have replaced wstcp with https://github.com/novnc/websockify which apparently does not have the issue.

With regards to the debug log comment, please take my observations with a pinch of salt, as wstcp codebase is a little beyond my skills (and the time I'd need to acquire them). Hope the packet capture will shed some light :crossed_fingers: I'll let you know with a comment here as soon as I sent the capture.

sile commented 3 years ago

Thanks a lot!