rstudio / httpuv

HTTP and WebSocket server package for R
Other
229 stars 86 forks source link

Support for the equivalent of javascript's `setInterval` #230

Open rundel opened 5 years ago

rundel commented 5 years ago

Most of the usage I've seen of httpuv's websocket functionality seems to use a method where the client will send a message which is then handled by the server which then responds with its own message.

For a project I am working on, a simpler solution is to have everything originate from the server which then sends a message on a scheduled basis out to the clients, i.e. something similar to what is possible with javascript's setInterval.

I've been able to achieve something like this by having a function recursively call itself with later on the desired interval,

onWSOpen = function(ws) {
  interval = 2

  f = function() {
    msg = paste("Hello", Sys.time())
    ws$send(msg)
    later::later(f, interval)
  }

  f()
}

but this feels a bit hacky, and I'm currently stuck with the later calls persisting even after the server is terminated. I feel like I will be able to find a way of using the ws object and httpuv::listServers to include a halt condition, but again this feels super hacky.

rundel commented 5 years ago

A full reprex of my example above:

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    f = function() {
      ws$send(paste("Hello", Sys.time()))
      later::later(f, 2)
    }
    f()
  }
)

browseURL("http://localhost:9454/")
startServer(host, port, app)
wch commented 5 years ago

I think this should do it. The change here is that there's an is_open flag, which gets set to FALSE when the websocket is closed; when it's FALSE, the function doesn't reschedule itself.

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    is_open <- TRUE

    ws$onClose(function() {
      message("in onClose")
      is_open <<- FALSE
    })

    send_and_reschedule <- function() {
      message("in send_and_reschedule")
      if (is_open) {
        message("websocket is open")
        ws$send(paste("Hello", Sys.time()))
        later::later(send_and_reschedule, 2)
      } else {
        message("websocket is closed")
      }
    }
    send_and_reschedule()
  }
)

browseURL("http://localhost:9454/")
startServer(host, port, app)

The only slightly weird thing is that there's one last call to send_and_reschedule() that occurs after the websocket is closed.

If you have the development version of later, that can also be addressed. The dev version of later supports cancelling a callback. So you can have the onClose callback immediately cancel the pending call to send_and_reschedule(), and not use the is_open flag at all.

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    cancel_send_and_reschedule <- function() {}

    ws$onClose(function() {
      message("in onClose")
      cancel_send_and_reschedule()
    })

    send_and_reschedule <- function() {
      message("in send_and_reschedule")
      message("sending")
      ws$send(paste("Hello", Sys.time()))
      cancel_send_and_reschedule <<- later::later(send_and_reschedule, 2)
    }
    send_and_reschedule()
  }
)
rundel commented 5 years ago

That looks much cleaner than what I was able to come up with, it gracefully handles both the server shutdown case and the client disconnect case (which I hadn't considered).

It seems like including something similar to this in the included demos might be useful for others like me who are looking for a more server push based approach.

wch commented 5 years ago

Good idea. I'll leave this issue open as a reminder to add a demo.

rundel commented 5 years ago

A tangential (but somewhat related question), do you have any thoughts on how to structure something like this to handle a moderate number of concurrent users where the task that is contained within send_and_reschedule is moderately involved (e.g. reading a file).

With the current setup each new session will trigger onWSOpen and get their own unique chain of later scheduled functions which will not necessarily be synchronized with one another. I can setup some kind of global cache that they all look at but it feels like I'll potentially be wasting a bunch of cycles unnecessarily.

wch commented 5 years ago

You can keep a registry of all currently-open websockets and iterate over it, sending a message to each one, with a single app-level callback that reschedules itself.

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

websockets <- new.env(parent = emptyenv())
next_ws_id <- 0

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list('Content-Type' = 'text/html'),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    # Create an ID for this websocket and add it to the registry
    ws_id <- sprintf("%09d", next_ws_id)
    next_ws_id <<- next_ws_id + 1

    websockets[[ws_id]] <- ws

    ws$onClose(function() {
      message("in onClose for ", ws_id)
      rm(list = ws_id, envir = websockets)
    })
  }
)

send_and_reschedule <- function() {
  message("send_and_reschedule")
  if (s$isRunning()) {
    later::later(send_and_reschedule, 2)
  } else {
    # If app has closed, clear out websocket registry and exit
    rm(list = ls(websockets, all.names = TRUE), envir = websockets)
    return()
  }

  # Send a message to each websocket
  for (id in names(websockets)) {
    message("Sending message to ", id)
    websockets[[id]]$send(paste0("Hello ID ", id, "  ",  Sys.time()))
  }
}

s <- startServer(host, port, app)
send_and_reschedule()

browseURL("http://localhost:9454/")

Some notes:

rundel commented 5 years ago

Thanks again for the help, I'm slowly wrapping my head around websockets and httpuv and your code has been invaluable in that process.

The approach I ended up with when trying to implement this myself looks like the following:

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

httpuv:::AppWrapper$set(
  "public", "get_wsconns",
  function() {
    private$wsconns
  }
)

WebServer2 <- R6::R6Class(
  "WebServer2",
  cloneable = FALSE,
  inherit = httpuv:::WebServer,
  public = list(
    initialize = function(host, port, app, interval = 5) {
      super$initialize(host, port, app)

      ws = private$appWrapper$get_wsconns()

      send_and_reschedule <- function() {
        if (!self$isRunning())
          return()

        print(Sys.time())
        #print(names(ws))

        for(n in names(ws)) {
          if (is.null(ws[[n]]))
            next
          msg = paste("Hello", n, Sys.time())
          message("Sent: ", msg)
          ws[[n]]$send(msg)
        }

        later::later(send_and_reschedule, interval)
      }
      send_and_reschedule()
    }
  )
)

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {

  }
)

z = WebServer2$new(host, port, app)
browseURL("http://localhost:9454/")
browseURL("http://localhost:9454/")
browseURL("http://localhost:9454/")

it has a lot more messing with the internals of the R6 classes but it avoids the need for the global variables.

A couple of questions that came up as I was working through this:

wch commented 5 years ago

Instead of dealing with the internals of WebServer and AppWrapper, it would be better to just wrap your application object in a local environment. Here I've done it with local(), but you could also use a function to generate the app object -- that is, instead of app <- local({ ... }), you have create_app <- function() { ... }; app <- create_app().

library(httpuv)

app <- local({
  host = "0.0.0.0"
  port = 9454

  html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

  websockets <- new.env(parent = emptyenv())
  next_ws_id <- 0

  send_and_reschedule <- function() {
    message("send_and_reschedule")
    if (s$isRunning()) {
      later::later(send_and_reschedule, 2)
    } else {
      # If app has closed, clear out websocket registry and exit
      rm(list = ls(websockets, all.names = TRUE), envir = websockets)
      return()
    }

    # Send a message to each websocket
    for (id in names(websockets)) {
      message("Sending message to ", id)
      websockets[[id]]$send(paste0("Hello ID ", id, "  ",  Sys.time()))
    }
  }

  list(
    call = function(req) {
      list(
        status = 200L,
        headers = list('Content-Type' = 'text/html'),
        body = glue::glue(html, host = host, port = port)
      )
    },
    onWSOpen = function(ws) {
      # On the first websocket connection only, kick off the
      # send_and_reschedule() polling.
      if (next_ws_id == 0) {
        send_and_reschedule()
      }

      # Create an ID for this websocket and add it to the registry
      ws_id <- sprintf("%09d", next_ws_id)
      next_ws_id <<- next_ws_id + 1

      websockets[[ws_id]] <- ws

      ws$onClose(function() {
        message("in onClose for ", ws_id)
        rm(list = ws_id, envir = websockets)
      })
    }
  )
})

s <- startServer(host, port, app)

browseURL("http://localhost:9454/")
jcheng5 commented 5 years ago

@wch What, no fastmap? 😁

rundel commented 5 years ago

The local({...}) bit makes this a lot more palatable, but deep down in my soul I can't help but feel wrong using <<-. And this is definitely more than enough for what I'm trying to get working.

The only other bit I feel a bit dissatisfied with is that this seems a lot like reinventing the wheel since AppWrapper is already doing all this internally. Is there a reason why it wouldn't be possible to pass each WebSocket the equivalent of a context pointer back to the parent AppWrapper? Does this kind of circular dependency cause grief with R6 classes?

Thanks again for all the help, you've gone way above and beyond.

I'll submit and issue and a PR for cleaning up the wsconns entry.

jcheng5 commented 5 years ago

Don't feel bad at all about using <<- in this context; it's only wrong IMHO if you don't know with 100% certainty what scope is going to end up taking the assignment. In this case, it's totally well-defined, since next_ws_id is defined right there in the parent scope.

That said, if it makes you feel better, you could replace the line next_ws_id <- 0 with:

get_next_ws_id <- local({
  next_ws_id <- 0L
  function() {
    (next_ws_id <<- next_ws_id + 1L)
  }
})

And then whenever you need a new id, call get_next_ws_id(). This would at least put the variable and the super-assignment right next to each other, and wrap it behind a function that's impossible to use incorrectly.