nicolasff / webdis

A Redis HTTP interface with JSON output
https://webd.is
BSD 2-Clause "Simplified" License
2.82k stars 307 forks source link

Fix for premature termination of websocket connection. #212

Open majklik opened 2 years ago

majklik commented 2 years ago

If a command waiting for data is used inside the websocket connection (eg XREAD BLOCK 5000 STREAMS mystream $), then Webdis will crash if the WS connection closes before the response from Redis arrives. When the WS connection terminates, the data is freed using cmd_free(), which uses a callback from Redis, so the callback function crashes. The fix adds in the callback functions test that if there is active request to terminate the Redis connection (triggered by redisAsyncDisconnect(ac) during WS closing), the response is no longer processed.

Well, this is a short hack to avoid SIGSEGV. I see a bigger problem here with the correct processing of commands via WS connection (with pipelining).

nicolasff commented 2 years ago

Hi @majklik, thanks a lot for reporting this issue and sending a fix!

I'll go over it this weekend and will get back to you shortly. Not that the code takes a while to review or anything, but I'd like to add a test for it if I can find a way to reproduce the crash.

majklik commented 2 years ago

Hi @nicolasff, there is a test page for crash:

<HTML>
 <HEAD>
  <META CHARSET="UTF-8"/>
  <META NAME="robots" CONTENT="nofollow,noindex"/>
  <META HTTP-EQUIV="Cache-Control" CONTENT="no-cache"/>
 </HEAD>
<BODY>
 <SCRIPT TYPE="text/javascript">
  function testStreamRead() {
    var xrSocket = new WebSocket("wss://develserver/webdis/.json");
    xrSocket.onmessage = function(messageEvent) {
        console.log("JSON XREAD received:", messageEvent.data);
        xrSocket.send(JSON.stringify(["XREAD", "BLOCK", 15000, "COUNT", 5, "STREAMS","mystream", "$"]));
    }
    xrSocket.onopen = function() {
        console.log("JSON XREAD socket connected!");
        xrSocket.send(JSON.stringify(["XREAD", "BLOCK", 15000, "COUNT", 5, "STREAMS","mystream", "$"]));
    }
    xrSocket.onerror = function(event) {
        console.log("JSON XREAD onerror:", event);
    }
    xrSocket.onclose = function(event) {
        console.log("JSON XREAD onclose:", event);
    }
  }
  testStreamRead();

  </SCRIPT>
</BODY>
</HTML>

It works simply - open websocket and repeatedly runs XREAD command in blocking mode (if there is not new records in the stream). If the page is closed during waiting then Webdis crash after BLOCK 15 s timeout in the Redis callback void json_reply(redisAsyncContext *c, void *r, void *privdata).

In the same way Webdis can crash any other command, if the websocket channel is closed and data freeed before Redis callback. The problem is not in SUBSCRIBE mode becouse in this case the callback is called immediatly during redisAsyncDisconnect(). But with other commands the callback is called later when the command finished and returns data, but at this moment private data (struct cmd *cmd) for callback is freeed and callback crash.

nicolasff commented 2 years ago

Thanks for the test page @majklik, this is very helpful.

I was able to reproduce the crash by following your instructions, basically loading the page once, noticing the successful connection, and just reloading the page. A few seconds later the REDIS_DISCONNECTING response came back and Webdis crashed.

Unfortunately your change is not enough to address the actual underlying cause of the crash, being that Redis responds after we've freed all the client objects – as you correctly diagnosed. It does help with this specific scenario, but there are other cases that it doesn't handle.

For example, even with your patch if you:

  1. Load the test page and have the WS client connect
  2. Restart Redis
  3. Reload the page … you still get a crash (not always on the first restart, which is interesting too).

The main thing that's missing here is a way to track that a response is coming back from Redis for a client that has gone away. There's already code that does something similar for WS clients, with a few fields tracking various expected events:

There needs to be something like that for the cmd object that reply callbacks try to send their response to. Thinking about it, I wondered if this wasn't also an issue in cases where a regular HTTP client sends a request, Redis takes a long time to respond, and the client disconnects before we hear back from Redis. While I don't think it could crash Webdis, this made me discover a whole new problem that is also pretty bad: I managed to make Webdis send its response to a different client.

This is the scenario:

  1. Run curl -s "http://127.0.0.1:7379/EVAL/$(printf %s 'for i=1,3000000000,1 do i=i end; return 42' | jq -sRr @uri)/0" – this runs a long for loop with EVAL, and returns 42. The jq is there to encode the script, if you don't have jq installed you can run this instead: curl -s 'http://127.0.0.1:7379/EVAL/for%20i%3D1%2C3000000000%2C1%20do%20i%3Di%20end%3B%20return%2042%0A/0'
  2. As *soon* as the curl command is sent and start blocking as it waits for the response, hit ctrl-c to kill it.
  3. Immediately after killing the first curl, send another request with: curl -s "http://127.0.0.1:7379/SUBSCRIBE/foo"

(Changing the 3000000000 to a larger value can make the EVAL duration longer and help reproduce the issue. On my machine it takes about 8 seconds to run and is enough for me to hit ctrl-c and paste+run the second curl command)

When the EVAL response comes back from Redis, it calls format_send_reply(cmd, …) which sends the reply to cmd->fd. When the first client disconnected, its descriptor (socket) was closed. So when the second client connected to Webdis, it was assigned the same fd as the previous one so the reply from the first command gets sent to the second client.

This is more complex and more serious than I originally thought reading your proposed fix. The (related) bug I described follows a similar scenario to the one you found, but both come down to the same core problem that's been here since the very first days of Webdis. Your report and contribution are much appreciated, but as I look into this more closely it's clear that a fix will require a larger change. Let me dig into it some more today and I'll update this thread once I have a better idea of what it will take to fully resolve these issues.

majklik commented 2 years ago

I'm afraid, that there are some next problematic points. :-( But for your last two mentioned:

This bug with restarted Redis is related to incorrect reaction on Redis callback with signalized error. When you restart Redis then the hiredis library automatically call redisAsyncDisconnect() internally. So when in Redis callback is signalized error then cannt be called any next library call on this context but at this moment the Webdis code call direct redisAsyncDisconnect(ac) or redisAsyncCommand(...) with some delay which can result in SIGSEGV (depends if the context was freeed or not). I've patched this in some way in my installation too.

A nice bug with the leaked FD from one to second Webdis HTTP call. :-) From fast test, maybe somwthing like this diff can help:

diff -ru webdis-master/src/client.c webdis-master_fd/src/client.c
--- webdis-master/src/client.c  2021-12-26 03:42:31.000000000 +0000
+++ webdis-master_fd/src/client.c       2022-01-04 13:34:09.586942011 +0000
@@ -300,6 +300,11 @@
                        cmd_free(cmd);
                }

+               if(c->last_cmd) {
+                       c->last_cmd->fd = -1;
+                       c->last_cmd = NULL;
+               }
+
                close(c->fd);

                http_client_free(c);
diff -ru webdis-master/src/client.h webdis-master_fd/src/client.h
--- webdis-master/src/client.h  2021-12-26 03:42:31.000000000 +0000
+++ webdis-master_fd/src/client.h       2022-01-04 13:33:07.848942573 +0000
@@ -62,6 +62,7 @@
        char *filename; /* content-disposition */

        struct cmd *reused_cmd;
+       struct cmd *last_cmd;

        struct ws_client *ws; /* websocket client */
 };
diff -ru webdis-master/src/cmd.c webdis-master_fd/src/cmd.c
--- webdis-master/src/cmd.c     2021-12-26 03:42:31.000000000 +0000
+++ webdis-master_fd/src/cmd.c  2022-01-04 14:15:26.376476911 +0000
@@ -28,6 +28,10 @@
        c->count = count;
        c->http_client = client;

+       if(client) {
+               client->last_cmd = c;
+       }
+
        c->argv = calloc(count, sizeof(char*));
        c->argv_len = calloc(count, sizeof(size_t));

@@ -56,6 +60,10 @@
        free(c->if_none_match);
        if(c->mime_free) free(c->mime);

+       if(c->http_client && c->http_client->last_cmd==c) {
+                c->http_client->last_cmd=NULL;
+       }
+
        if (c->ac && /* we have a connection */
                (c->database != c->w->s->cfg->database /* custom DB */
                || cmd_is_subscribe(c))) {
diff -ru webdis-master/src/conf.c webdis-master_fd/src/conf.c
--- webdis-master/src/conf.c    2021-12-26 03:42:31.000000000 +0000
+++ webdis-master_fd/src/conf.c 2022-01-04 13:41:25.549938037 +0000
@@ -6,6 +6,7 @@
 #include <unistd.h>
 #include <pwd.h>
 #include <grp.h>
+#include <errno.h>

 #include <jansson.h>
 #include <evhttp.h>
diff -ru webdis-master/src/http.c webdis-master_fd/src/http.c
--- webdis-master/src/http.c    2021-12-26 03:42:31.000000000 +0000
+++ webdis-master_fd/src/http.c 2022-01-04 13:38:09.229939827 +0000
@@ -234,6 +234,10 @@
        char *p;
        int i, ret;

+       if(fd<0) {
+               return;
+       }
+
        r->out_sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2;
        r->out = calloc(r->out_sz + 1, 1);

It track in the HTTP client called command and when the HTTP connection is closed then invalidate cmd->fd, so this ghost write is not performed on Redis callback.

The use of the EVAL script for tests is not necessary, nice works command like curl -Ns "http://127.0.0.1:7379/BRPOP/nonexistlist/10" - it wait 10 second before returning. :-)

Again, this is in category of hot fix. I§m thinking that code need a bit more changes for proper operation in the disconnecting phase. But maybe first must be fixed some websocket frame handling, which then show up some new problems to take in account.

The added in conf.c - I'm unable compile Webdis without this.

majklik commented 2 years ago

To discuss how to deal with fair closing from the perspective of HTTP (WS) and Redis side, I'm thinking that a possible solution using a structure like this:

struct workingset {
        redisAsyncContext       *ac;            /* Redis connection */
        int                     acstate;        /* connected, working, closing, freeed */
        struct pool             *pool;          /* POOL, if is attached */
        struct http_client      *hc;            /* HTTP client */
        struct cmd              *subscribecmd;  /* subscribe CMD, if is subscribe active */
};

The HTTP and Redis clients would be associated with each other in it, if one party closes, it will be marked as closed in the structure and release its part. When the other party closes, it also deletes its part, and if both the HTTP and Redis structures are marked as released, the entire structure can be destroyed or reused, the release would be performed by the party that closes later. HTTP at the point where it closes its FD, Redis side in the disconnect or ac->dataCleanup callback. The command in the cmd structure, if it was passed via redisAsyncCommand, then this data is released via cmd_free() only in the Redis callback. If it is a subscription, then this command will not be released until the disconnect callback. The link to this structure is kept by the HTTP client, in the Redis part it would be in ac->data. Also in the struct pool, instead of the redisContext field, this structure can be directly.

Next good point for solution like this is better support for a RESP3 protocol and handling of the async PUSH callback, because this callback haven't own private data and must rely only on the ac->data. The PUSH callback is used for delivering subscribed messages and KEY change tracking notifications which can be used in cobination with a websocket connection. This will too be helpfull for the websocket connectiong with command pipelining (at this moment don't works corrertly due bugs in ws_peek_data(...) and ws_execute(...) ).

nicolasff commented 2 years ago

Thanks again @majklik for this investigation. I like the changes you're proposing with last_cmd, things look good overall except for the early return in http_response_write. The struct http_response * that's passed in is allocated by the caller but since this all async the caller can't just wait for the write and then free the object. Instead, this happens either if we can't schedule the write:

        int ret = event_add(&r->ev, NULL);
        if (ret != 0) { /* could not schedule write */
            slog(r->w->s, WEBDIS_ERROR, "Could not schedule HTTP write", 0);
            http_response_cleanup(r, fd, 0);

or more often, once the write has completed at the end of http_can_write:

    if(ret <= 0 || r->out_sz - r->sent == 0) { /* error or done */
        http_response_cleanup(r, fd, (int)r->out_sz == r->sent ? 1 : 0);

So here returning early means that we never free this response object.

I noticed this while reading your proposed patch, but for any contributions I also run the test suite under Valgrind. This is easy to do if you're on Linux, but on macOS I use a dedicated Docker image for this.

This is the Dockerfile I use as Dockerfile.valgrind ```dockerfile FROM alpine:3.14.3 RUN apk update && apk add gcc valgrind make libevent-dev musl-dev bsd-compat-headers msgpack-c-dev git redis jq RUN mkdir /tmp/webdis COPY . /tmp/webdis/ RUN cd /tmp/webdis && find src -name '*.d' -delete && find src -name '*.o' -delete && make clean && make -j all install DEBUG=1 RUN echo "daemonize yes" >> /etc/redis.conf && \ sed -E -i 's/"daemonize":.*true/"daemonize": false, "websockets": true/g' /etc/webdis.prod.json && \ sed -E -i'' -e 's#"logfile":.*#"logfile": "/dev/stderr"#g' /etc/webdis.prod.json && \ sed -E -i'' 's/"verbosity": *[0-9]+/"verbosity": 8/g' /etc/webdis.prod.json && \ cat /etc/webdis.prod.json | jq . CMD /usr/bin/redis-server /etc/redis.conf >/dev/null 2>/dev/null && valgrind --tool=memcheck --leak-check=full --show-reachable=yes --show-possibly-lost=yes --track-origins=yes --leak-resolution=high /usr/local/bin/webdis /etc/webdis.prod.json EXPOSE 7379 ```

I build the image with

docker build -t webdis:valgrind -f Dockerfile.valgrind .

and run it with

docker run --rm -ti -p127.0.0.1:7379:7379 webdis:valgrind

Running Webdis with the patch as-is, I sent a curl -Ns http://127.0.0.1:7379/BRPOP/nonexistlist/10 (good suggestion by the way!) and killed it with ctrl-c.

A few seconds later, Valgrind showed a read into freed memory:

==1== Thread 2:
==1== Invalid read of size 8
==1==    at 0x10BA12: cmd_free (cmd.c:66)
==1==    by 0x119445: format_send_reply (common.c:129)
==1==    by 0x1170DB: json_reply (json.c:40)
==1==    by 0x12349B: __redisRunCallback (async.c:287)
==1==    by 0x123F87: redisProcessCallbacks (async.c:572)
==1==    by 0x1241EB: redisAsyncRead (async.c:635)
==1==    by 0x124248: redisAsyncHandleRead (async.c:654)
==1==    by 0x1149F8: redisLibeventHandler (libevent.h:69)
==1==    by 0x48CE1AF: ??? (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x48CE914: event_base_loop (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x10CF4F: worker_main (worker.c:179)
==1==    by 0x40531BA: ??? (in /lib/ld-musl-x86_64.so.1)
==1==  Address 0x490a7f8 is 392 bytes inside a block of size 408 free'd
==1==    at 0x48A4AF1: free (vg_replace_malloc.c:755)
==1==    by 0x112805: http_client_free (client.c:276)
==1==    by 0x11293C: http_client_read (client.c:310)
==1==    by 0x10C9F1: worker_can_read (worker.c:46)
==1==    by 0x48CE200: ??? (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x48CE914: event_base_loop (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x10CF4F: worker_main (worker.c:179)
==1==    by 0x40531BA: ??? (in /lib/ld-musl-x86_64.so.1)
==1==  Block was alloc'd at
==1==    at 0x48A6BD2: calloc (vg_replace_malloc.c:1117)
==1==    by 0x11253A: http_client_new (client.c:213)
==1==    by 0x10DDDA: server_can_accept (server.c:172)
==1==    by 0x48CE1AF: ??? (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x48CE914: event_base_loop (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x10E353: server_start (server.c:314)
==1==    by 0x10B89E: main (webdis.c:16)

So this is different issue, unrelated to the http_response leaking. This time the struct cmd tries to detach from its http_client in the block you suggested:

    if(c->http_client && c->http_client->last_cmd == c) {
        c->http_client->last_cmd = NULL;
    }

This can't happen if the http_client was freed, so it seems to me that detaching the http_client when the cmd's fd is set to -1 would help here:

        if (c->last_cmd) {
            c->last_cmd->fd = -1;
+           c->last_cmd->http_client = NULL;
            c->last_cmd = NULL;
        }

With this fix Valgrind no longer reports the read into freed memory after 10 seconds, but killing Webdis with ctrl-c produces the leak report which shows the expected "definitely lost: 216 bytes in 1 blocks", with the following details:

==1== 628 (216 direct, 412 indirect) bytes in 1 blocks are definitely lost in loss record 42 of 50
==1==    at 0x48A6BD2: calloc (vg_replace_malloc.c:1117)
==1==    by 0x1101C0: http_response_init (http.c:26)
==1==    by 0x119369: format_send_reply (common.c:110)
==1==    by 0x1170F1: json_reply (json.c:40)
==1==    by 0x1234B1: __redisRunCallback (async.c:287)
==1==    by 0x123F9D: redisProcessCallbacks (async.c:572)
==1==    by 0x124201: redisAsyncRead (async.c:635)
==1==    by 0x12425E: redisAsyncHandleRead (async.c:654)
==1==    by 0x114A0E: redisLibeventHandler (libevent.h:69)
==1==    by 0x48CE1AF: ??? (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x48CE914: event_base_loop (in /usr/lib/libevent-2.1.so.7.0.1)
==1==    by 0x10CF4F: worker_main (worker.c:179)

Adding a call to http_response_cleanup before the return and avoiding the close(fd) in that function when it's negative addresses this issue.

I pushed this as d28dd3ec802849a6e22a06ef858c3d1771cbeb5b with credits to you. Thanks again for these suggestions! I always find it impressive when people dig into the Webdis code and manage to clearly understand how it works, and am always grateful for these contributions.

As for the original issue with WebSockets, I've been in touch with @jessie-murray who made large changes related to it in the past few months and we'll go over it later today.

majklik commented 2 years ago

Thank you for accepting and properly refining my crazy idea :-)

A slightly different solution will probably be needed for the websocket connection, as there may be a number of commands on the way, including the block: WATCH /.../ MULTI /.../ EXEC. So one c->last_cmd (respectively ws->cmd is used now) will not be enough.

A piece of JavaScript that tries to send several commands and waits for a response (and intentionally the first command is to wait a second for it to take full effect):

  <SCRIPT TYPE="text/javascript">

   function testSetGet() {
      var jsonSocket = new WebSocket("ws://127.0.0.1:7379/.json");
      jsonSocket.onclose = function(event) {
        console.log("JSON onclose:", event);
      };
      jsonSocket.onerror = function(event) {
        console.log("JSON onerror:", event);
      };
      jsonSocket.onmessage = function(messageEvent) {
        console.log("JSON received:", messageEvent.data);
      };
      jsonSocket.onopen = function() {
        console.log("JSON socket connected!");
        jsonSocket.send(JSON.stringify(["BZPOPMAX", "notexistkey", 1]));
        jsonSocket.send(JSON.stringify(["SET", "hello", "world"]));
        jsonSocket.send(JSON.stringify(["SET", "hello2", "world2"]));
    jsonSocket.send(JSON.stringify(["GET", "hello"]));
    jsonSocket.send(JSON.stringify(["GET", "helloxx"]));
    jsonSocket.send(JSON.stringify(["MGET", "hello", "hello2"]));
    jsonSocket.send(JSON.stringify(["MGET", "hellox", "helloy"]));
    jsonSocket.send(JSON.stringify(["XADD", "hello", "*", "A", "B"]));
    jsonSocket.send(JSON.stringify(["XADD", "hellos", "*", "A", "B"]));
    jsonSocket.send(JSON.stringify(["PING", "lastcmd"]));
      };

       setTimeout(() => {console.log("JSON socket closing!");jsonSocket.close();}, 5000);
  }
  testSetGet();
  </SCRIPT>

At this point, Webis usually executes about 2 commands from that series and does not terminate the websocket connection correctly:

JSON socket connected!
JSON received: {"MGET":null}
JSON received: {"MGET":["world",null]}
JSON socket closing!
JSON onerror: error { target: WebSocket, isTrusted: true, srcElement: WebSocket, currentTarget: WebSocket, eventPhase: 2, bubbles: false, cancelable: false, returnValue: true, defaultPrevented: false, composed: false, … }
JSON onclose: close { target: WebSocket, isTrusted: true, wasClean: false, code: 1006, reason: "", srcElement: WebSocket, currentTarget: WebSocket, eventPhase: 2, bubbles: false, cancelable: false, … }

There are two errors in the ws_peek_data(...) function, where the first causes that if more websocket frames are waiting in the ws->rbuf queue, only the command from the first frame in the queue is executed and the others are discarded. The second error is the assumption that the incoming websocket frame must be at least 8 bytes long, but the disconnect frame at the request of the javascript client is only 6 bytes long - jsonSocket.close() (8 bytes long disconnect frame comes if the browser window/tab is closed, when reason code is added to the frame). The shortest message can be only 2 bytes (ping/pong message without payload). Diff for websocket.c, which tries to handle these errors and also adds control that the close frame and data payload must be masked. If not, the connection is terminated as required by RFC:

--- websocket.c.orig    2022-01-10 12:50:20.331031836 +0100
+++ websocket.c 2022-01-10 22:12:26.179554640 +0100
@@ -234,6 +234,7 @@

 static void
 ws_log_cmd(struct ws_client *ws, struct cmd *cmd) {
+       int i;
        char log_msg[SLOG_MSG_MAX_LEN];
        char *p = log_msg, *eom = log_msg + sizeof(log_msg) - 1;
        if(!slog_enabled(ws->http_client->s, WEBDIS_DEBUG)) {
@@ -244,7 +245,7 @@
        memcpy(p, "WS: ", 4); /* WS prefix */
        p += 4;

-       for(int i = 0; p < eom && i < cmd->count; i++) {
+       for(i = 0; p < eom && i < cmd->count; i++) {
                *p++ = '/';
                char *arg = cmd->argv[i];
                size_t arg_sz = cmd->argv_len[i];
@@ -379,7 +380,7 @@

        /* parse frame and extract contents */
        size_t sz = evbuffer_get_length(ws->rbuf);
-       if(sz < 8) {
+       if(sz < 2) {
                return WS_READING; /* need more data */
        }
        /* copy into "frame" to process it */
@@ -399,6 +400,30 @@

        /* get payload length */
        len = frame[1] & 0x7f;  /* remove leftmost bit */
+
+       size_t minsz = has_mask ? 6:2;
+       if(len <= 125) {
+               minsz += len;
+       } else if(len == 126) {
+               minsz += sizeof(uint16_t);
+       } else {
+               minsz += sizeof(uint64_t);
+       }
+       if(sz < minsz) { /* not enough data */
+               int add_ret = evbuffer_prepend(ws->rbuf, frame, sz); /* put the whole frame back */
+               free(frame);
+               return add_ret < 0 ? WS_ERROR : WS_READING;
+       }
+
+       if((len || frame_type == WS_CONNECTION_CLOSE) && !has_mask) {
+               /* client MUST use masking for payload and close frame (RFC6455) */
+               ws->close_after_events = 1;
+               const char closereason[2]={0x03,0xEA}; /* 1002 - protocol error */
+               ws_frame_and_send_response(ws, WS_CONNECTION_CLOSE, closereason, 2);
+               free(frame);
+               return WS_ERROR;
+       }
+
        if(len <= 125) { /* data starts right after the mask */
                p = frame + 2 + (has_mask ? 4 : 0);
                if(has_mask) memcpy(&mask, frame + 2, sizeof(mask));
@@ -436,7 +461,7 @@

                /* create new ws_msg object holding what we read */
                int add_ret = ws_msg_add(msg, p, len, has_mask ? mask : NULL);
-               if(!add_ret) {
+               if(add_ret < 0) {
                        free(frame);
                        return WS_ERROR;
                }
@@ -444,7 +469,7 @@
                size_t processed_sz = len + (p - frame); /* length of data + header bytes between frame start and payload */
                msg->total_sz += processed_sz;

-               ev_copy = evbuffer_prepend(ws->rbuf, frame + len, sz - processed_sz); /* remove processed data */
+               ev_copy = evbuffer_prepend(ws->rbuf, frame + processed_sz, sz - processed_sz); /* remove processed data */
        } else { /* we're just peeking */
                ev_copy = evbuffer_prepend(ws->rbuf, frame, sz); /* put the whole frame back */
        }

With this modification, all sent commands are executed correctly and the connection is closed correctly. But there is another problem, all commands send back a response, as if they were PING commands:

JSON socket connected!
JSON received: {"PING":null}
JSON received: {"PING":[true,"OK"]}
JSON received: {"PING":"world"}
JSON received: {"PING":null}
JSON received: {"PING":["world","world2"]}
JSON received: {"PING":[null,null]}
JSON received: {"PING":[false,"WRONGTYPE Operation against a key holding the wrong kind of value"]}
JSON received: {"PING":"1641820001762-0"}
JSON received: {"PING":"lastcmd"}
JSON socket closing!
JSON onclose: close { target: WebSocket, isTrusted: true, wasClean: true, code: 1005, reason: "", srcElement: WebSocket, currentTarget: WebSocket, eventPhase: 2, bubbles: false, cancelable: false, … }

This is because the ws_execute(...) function reuses and overwrites ws->cmd with new data before the command is executed, and the response is returned from the redis callback. The code does not assume that multiple commands can be executed in the queue here. But this is for another fix, which is likely to affect the connection closure solution. And in the code for the websocket, it is necessary to properly handle the state when SUBSCRIBE / PSUBSCRIBE commands are used, because the hiredis library remembers the privdata used with the given redisAsyncCommand(...) and calls them back repeatedly when the message is received. For these 2 commands it is necessary to keep the cmd structures for the entire duration of the active subscribe (so delete only when the connection is closed or after all UNSUBSCRIBE / PUNSUBSCRIBE - it can be detected according to the state ac->c.flags & REDIS_SUBSCRIBED).

majklik commented 2 years ago

I looked at this issue again:

For example, even with your patch if you:

1. Load the test page and have the WS client connect

2. Restart Redis

3. Reload the page
   … you still get a crash (not always on the first restart, which is interesting too).

I'm afraid that not only is there a problem in closing a websocket connection, but even when opening it doesn't resolve possible errors. For example, if a Redis is not available (or immediately rejects a connection that there are too many clients, for example), then the Websocket connection opens, but the first command sent to the websocket connection usually ends in SIGSEGV. Because pool_connect(..., attach = 0) is called when opening a websocket connection from ws_client_new(...), it tries to open a connection to Redis via ac = redisAsyncConnect(...) and then is invoked callback pool_on_connect(. .., status = -1) with a signaled error, after which redisAsyncContext *ac is released. This is not captured in any way, so then when the command via websocket occurs, it calls ws_execute(...) => cmd_send(...) => redisAsyncCommandArgv(cmd->ac, ...) and here it falls because ac is already released. If Redis accepts the connection but terminates immediatly with an error (-ERR too many clients), the situation is similar: ws_client_new(...) => pool_connect(..., attach = 0), callback pool_on_connect(..., status = 0) and then immediately callback pool_on_disconnect(...) with a reported error and then release the context, so the subsequent ws_execue() will do badly.

By the way, the SUBSCRIBE command over HTTP seems to have a very similar problem. If I try to use curl -sND - http://127.0.0.1:7379/SUBSCRIBE/topic and Redis is not running (or the connection ends immediately), the TCP channel will open, but nothing will come back, no headers and it's on the client's timeout before TCP closes for inactivity. However, this does not fall to SIGSEGV. In this case, the sequence of the hiredis calls are a little different and it manages to send a SUBSCRIBE command using redisAsyncCommandArgv(cmd->ac, ...) before the callback for closing and releasing the context is activated. A callback from the SUBSCRIBE command is also called because it is already inserted in the queue. But the callback from the SUBSCRIBE command silently discards the error and the connection remains "stiff".

And if Redis disconnects later, when the SUBSBRICE via HTTP or the Websocket connection is open correctly, then this is probably not handled correctly too. SUBSCRIBE remains stiff because the error from hiredis is ignored in the callback in the format_send_error(...) function. The Websocket behaves similarly, a "Service Unavailable" error message is sent to the Websocket as plain text, but left open. Should the connection be closed in these cases?

Diff for the common.c file, which could possibly solve SUBSCRIBE problems over HTTP and partially Websocket (terminates the connection after a Redis connection error):

--- common.c.orig       2022-01-12 11:48:35.224619336 +0100
+++ common.c    2022-01-12 15:11:06.238854733 +0100
@@ -48,10 +48,18 @@
                http_response_write(resp, cmd->fd);
        } else if(cmd->is_websocket && !cmd->http_client->ws->close_after_events) {
                ws_frame_and_send_response(cmd->http_client->ws, WS_BINARY_FRAME, msg, strlen(msg));
+               const char close_status[2] = {0x03,0xf3}; /* 1011 - unexpected condition */
+               ws_frame_and_send_response(cmd->http_client->ws, WS_CONNECTION_CLOSE, close_status, 2);
+               cmd->http_client->ws->close_after_events = 1;
        }

        if (!cmd->is_websocket) { /* don't free or detach persistent cmd */
                if (cmd->pub_sub_client) { /* for pub/sub, remove command from client */
+                       if(cmd->started_responding == 0) {
+                               http_send_error(cmd->http_client, code, msg);
+                       } else {
+                               http_response_write_chunk(cmd->fd, cmd->w, NULL, 0); /* Closing chunk on Redis error */
+                       }
                        cmd->pub_sub_client->reused_cmd = NULL;
                } else {
                        cmd_free(cmd);

The question is whether in the event of an error during a SUBSCRIBE over HTTP or websocket connection, just silently terminate the connection or insert some formatted error message into the data stream. For a websocket connection, I would rather not insert it into the data (as is happening now), but insert it as a reason description in the close frame (where this diff now only inserts the status code).

nicolasff commented 2 years ago

Thanks again for all your comments. I haven't had much time to look into this in more detail until now, it's likely I won't be able to dedicate a large chunk of time for it until the weekend but be assured that I'm eager to get this fixed.

I went over some of the diffs you posted. First off, good call on the 2-byte PING frame that's currently blocked due to the 8-byte minimum! That said, there's a part I'm not too sure about in what you're proposing, specifically this condition:

       if((len || frame_type == WS_CONNECTION_CLOSE) && !has_mask) {
               /* client MUST use masking for payload and close frame (RFC6455) */

I agree that this applies for WS_CONNECTION_CLOSE, since according to RFC6455 section 5.5.1:

Close frames sent from client to server must be masked as per Section 5.3.

But I'm not sure this applies for just len as a condition. Right? if(len) would be a true predicate simply if the frame had a non-zero payload length, and this is allowed without a mask, isn't it?

Section 5.7 gives an example of an unmasked data frame:

A single-frame unmasked text message 0x81 0x05 0x48 0x65 0x6c 0x6c 0x6f (contains "Hello") Which here would have len = 0x05.

I sent an unmasked frame containing ["PING"] (so not a WS PING but a Redis PING command), like this:

8108 5b22 5049 4e47 225d                 ..["PING"]

And indeed it entered that block; I received this in response:

8802 03ea                                ....

Which decodes to: FIN=true, opcode="Connection Close", payload size=2, payload=0x03 0xea (1002 - protocol error).

I'm also not sure about this part:

    if(len <= 125) {
        minsz += len;
    } else if(len == 126) {
        minsz += sizeof(uint16_t);
    } else {
        minsz += sizeof(uint64_t);
    }

Specifically, the second and third conditions. minsz is initialized to either 2 or 6 depending on the mask flag, so if len is under 125 we just add len and that's how much we need to read. But if len is 126, we need to read (2 or 6) + the 2 bytes containing the actual length, plus the value in the payload length field, right? But also, all this is already done below, so why would this block be needed in addition to the existing code?

majklik commented 2 years ago

Regarding masking - similar to the requirement for the CLOSE frame, the RFC above states that the client must send all payloads with masking. I went through the RFC one more time and the primary requirement is in "5.1. Overview":

a client MUST mask all frames that it sends to the server. A server MUST NOT mask any frames that it sends to the client.

So in the following chapters, this is just repeated to emphasize this introductory part. This probably explains why there are examples of masked (from the client) and unmasked (from the server) data frames in section 5.7. I made an attempt to send a PING frame from webdis and the browser returns a response with a mask even if payload is not used. Similarly, the browser sends an empty data frame with a mask. Tested with Firefox and Edge. So the right check should be a lot harder:

        if(!has_mask) {
                /* a client MUST mask all frames that it sends to the server (RFC6455, 5.1. Overview) */

Regarding minsz - I should probably add more comments next time. :-) It only serves to check that the next part has enough data not to read beyond the data boundary. Maybe this is clearer:

        size_t minsz = has_mask ? 6:2;
        if(len == 126) {
                minsz += sizeof(uint16_t);
        } else if(len == 127) {
                minsz += sizeof(uint64_t);
        }

In the original code, there is a potential risk that the subsequent part will try to read data about the requeired frame length and the mask value behind the actually loaded buffer, this is just a check to make sure that the number of bytes actually passed (sz) may be less than needed for the next part (in my case there is a certainty that only 2 bytes will be available in the frame, 8 bytes in the original code, but depending on the flags in the header and mask the next part may need up to 14 bytes).

majklik commented 2 years ago

Maybe a bit better version of a patch for the ws_peek_data(...) function:

--- websocket.c.orig    2022-01-10 12:50:20.331031836 +0100
+++ websocket.c 2022-01-15 13:08:03.661929542 +0100
@@ -234,6 +234,7 @@

 static void
 ws_log_cmd(struct ws_client *ws, struct cmd *cmd) {
+       int i;
        char log_msg[SLOG_MSG_MAX_LEN];
        char *p = log_msg, *eom = log_msg + sizeof(log_msg) - 1;
        if(!slog_enabled(ws->http_client->s, WEBDIS_DEBUG)) {
@@ -244,7 +245,7 @@
        memcpy(p, "WS: ", 4); /* WS prefix */
        p += 4;

-       for(int i = 0; p < eom && i < cmd->count; i++) {
+       for(i = 0; p < eom && i < cmd->count; i++) {
                *p++ = '/';
                char *arg = cmd->argv[i];
                size_t arg_sz = cmd->argv_len[i];
@@ -379,7 +380,7 @@

        /* parse frame and extract contents */
        size_t sz = evbuffer_get_length(ws->rbuf);
-       if(sz < 8) {
+       if(sz < 2) {
                return WS_READING; /* need more data */
        }
        /* copy into "frame" to process it */
@@ -387,7 +388,7 @@
        if(!frame) {
                return WS_ERROR;
        }
-       int rem_ret = evbuffer_remove(ws->rbuf, frame, sz);
+       int rem_ret = evbuffer_copyout(ws->rbuf, frame, sz);
        if(rem_ret < 0) {
                free(frame);
                return WS_ERROR;
@@ -397,22 +398,45 @@
        frame_type = frame[0] & 0x0F;   /* lower 4 bits of first byte */
        has_mask = frame[1] & 0x80 ? 1:0;

+       if(!has_mask) {
+               /* a client MUST mask all frames that it sends to the server (RFC6455, 5.1. Overview) */
+               ws->close_after_events = 1;
+               const char *close_code_reason="\x03\xeaReceived a frame without a mask from the client (violates RFC6455, 5.1. Overview)."; /* 0x03,0xEA = 1002 - protocol error */
+               ws_frame_and_send_response(ws, WS_CONNECTION_CLOSE, close_code_reason, strlen(close_code_reason));
+               free(frame);
+               return WS_ERROR;
+       }
+
        /* get payload length */
        len = frame[1] & 0x7f;  /* remove leftmost bit */
+
+       /* checking that the copyout frame contains the minimum data needed to determine the true length and mask in next step */
+       size_t minsz = 6;
+       if(len == 126) {
+               minsz += sizeof(uint16_t);
+       } else if(len == 127) {
+               minsz += sizeof(uint64_t);
+       }
+       if(sz < minsz) { /* not enough data */
+               free(frame);
+               return WS_READING;
+       }
+
+       /* determine payload size */
        if(len <= 125) { /* data starts right after the mask */
-               p = frame + 2 + (has_mask ? 4 : 0);
-               if(has_mask) memcpy(&mask, frame + 2, sizeof(mask));
+               p = frame + 6;
+               memcpy(&mask, frame + 2, sizeof(mask));
        } else if(len == 126) {
                uint16_t sz16;
                memcpy(&sz16, frame + 2, sizeof(uint16_t));
                len = ntohs(sz16);
-               p = frame + 4 + (has_mask ? 4 : 0);
-               if(has_mask) memcpy(&mask, frame + 4, sizeof(mask));
+               p = frame + 8;
+               memcpy(&mask, frame + 4, sizeof(mask));
        } else if(len == 127) {
                uint64_t sz64 = *((uint64_t*)(frame+2));
                len = webdis_ntohll(sz64);
-               p = frame + 10 + (has_mask ? 4 : 0);
-               if(has_mask) memcpy(&mask, frame + 10, sizeof(mask));
+               p = frame + 14;
+               memcpy(&mask, frame + 10, sizeof(mask));
        } else {
                free(frame);
                return WS_ERROR;
@@ -420,9 +444,8 @@

        /* we now have the (possibly masked) data starting in p, and its length.  */
        if(len > sz - (p - frame)) { /* not enough data */
-               int add_ret = evbuffer_prepend(ws->rbuf, frame, sz); /* put the whole frame back */
                free(frame);
-               return add_ret < 0 ? WS_ERROR : WS_READING;
+               return WS_READING;
        }

        int ev_copy = 0;
@@ -435,8 +458,8 @@
                *out_msg = msg; /* attach for it to be freed by caller */

                /* create new ws_msg object holding what we read */
-               int add_ret = ws_msg_add(msg, p, len, has_mask ? mask : NULL);
-               if(!add_ret) {
+               int add_ret = ws_msg_add(msg, p, len, mask);
+               if(add_ret < 0) {
                        free(frame);
                        return WS_ERROR;
                }
@@ -444,9 +467,7 @@
                size_t processed_sz = len + (p - frame); /* length of data + header bytes between frame start and payload */
                msg->total_sz += processed_sz;

-               ev_copy = evbuffer_prepend(ws->rbuf, frame + len, sz - processed_sz); /* remove processed data */
-       } else { /* we're just peeking */
-               ev_copy = evbuffer_prepend(ws->rbuf, frame, sz); /* put the whole frame back */
+               ev_copy = evbuffer_drain(ws->rbuf, processed_sz); /* remove processed data from evbuffer */
        }
        free(frame);

As the previous version does, it handles multiple websockes frames in ws->rbuf. He guards the mask on all frames. It also handles short frames. And as an adjustment, it doesn't use evbuffer_remove(...) to retrieve data from ws->rbuf and then roll back with evbuffer_prepend(...) on error or process only part of the data, but it does make a copy of the data from ws->rbuf using evbuffer_copyout(...) and as soon as it is processed, it deletes them from ws->rbuf using evbuffer_drain(...). This saves moving back and forth, especially if the function ws_peek_data(...) is called twice in a row on the same data (first to check if the entire frame is available and second to take over).

By the way, the logic of websocket connection handling is not quite OK yet. I also encountered a problem in the logic with closing the connection and also the incoming data is not handled quite correctly for large data sets. But let's leave that to the next iteration.

nicolasff commented 2 years ago

Thanks @majklik, this looks good. I've integrated your proposed changes to ws_peek_data in 73f2905 and tested the logic here.

I haven't heard much from @jessie-murray after reaching out, but my understanding is that we won't be able to collaborate on this like on previous changes to WS code where I had heavily guided him (or at least not right now). As I've mentioned on other GitHub issues in the past, I'm not able to make changes to this code base at this time due to IP ownership. Integrating proposed changes like here leaves no doubt regarding who is the author – and you're duly credited in the commit title – but making the larger changes that are required here might take a little while.

I have some ideas that I'll explore on my side and will update this page once I figure out how to get this done.