crystal-lang / crystal

The Crystal Programming Language
https://crystal-lang.org
Apache License 2.0
19.35k stars 1.62k forks source link

WebSockets should have blocking methods #5600

Open watzon opened 6 years ago

watzon commented 6 years ago

WebSockets in Crystal are very difficult to work with, not just because they are basically undocumented, but also because it's methods don't block which is not very Crystal-like.

I believe, and @oprypin will back me up, that WebSockets should be handled in the following way:

ws = HTTP::WebSocket.new("ws://echo.websocket.org", "/")

ws.send("hello", "This is a message")
message = ws.receive

Where ws.receive blocks until a response comes. If anyone wants this to be done asynchronously they can always spawn a fiber.

ws = HTTP::WebSocket.new("ws://echo.websocket.org", "/")
my_channel = Channel(String).new

spawn do
  ws.send("hello", "This is a message")
  message = ws.receive
  my_channel.send(message)
end

puts my_channel.receive
oprypin commented 6 years ago

Something needs to be said about the distinction between message/binary/ping https://crystal-lang.org/api/0.24.1/HTTP/WebSocket.html

Could be done as .receive returning Message|Binary|Ping, then using that in "pattern matching" or doing .as(Message)

watzon commented 6 years ago

I like that idea. It would be pretty easy to implement. As it is the whole WebSocket implementation is pretty small.

asterite commented 6 years ago

As far as I know @waj wanted WebSocket to work like that too.

ghost commented 6 years ago

Is here any work on this?

I am not sure if i can follow or agree. WebSocket is asynchronous by nature and at this time with Crystal 0.24+ they works a aspected, but i am interested in the fastest way of communication in the future of this.

So, at this moment, we can simply run a WebSocket and if needed, we can use a collection (Array, Hash,..) to register for the waiting events; if a message arrives, we have only to call the registered fiber - whatever - due to a Channel.

On the other side, if on_message would block, we could never send directly at a random place, because this place blocks the whole socket itself.

Szenario:

I am working on a cluster with several cloudinstances, all connected via websocket. At this time - now, before blocking - i simply send a request at the place where it is needed, push it to the collection and block this fiber due Channel.receive. If a message arrives, on_message pushes this to the collection of Channels and my Channel.receive gets fullfilled.

This is completely asynchronous and works; limited only due to the scheduling.

So - on the other side is my question: could a blocking implementation reduce the scheduling? I don't think so; it's just another way of switching - and it seems the blocking version is less flexible because you block the socket already where you use it?

But - at least - the nature of WebSockets should be near-realtime-communication, so for me it depends at least on performance and concurrency.

So, what would be cheaper for scheduling?

Another option is to use Pool of blocking clients instead of a collection of registered Channels - how big should the pool be? How many events will we have? Sure, no problem to build a dynamically pool of WebSockets, which can block wherever is needed... Hmm... Could the pool be cheaper as the Channels? It's for sure ok to run 10k of fibers but if every fiber has to use a blocking socket - is it cheap to build a pool of 10k WebSockets?

Using as Server for WebSockets the blocking is no problem, because it's only one endpoint. But using it as Client could be different in other szenarios as simply 1 server with 10k chatters; this server could block for sure.

I don't know - but i think blocking isn't good at all :)

RX14 commented 6 years ago

There's already a fiber used for websocket.run, which is the same fiber that the callbacks run in. So there's no less or more fibers, it's just about making websockets work like sockets. There's no on_read for normal sockets.

asterite commented 6 years ago

I have this working with this patch:

diff --git a/src/http/web_socket.cr b/src/http/web_socket.cr
index 3e1d95d4f..8fba2012a 100644
--- a/src/http/web_socket.cr
+++ b/src/http/web_socket.cr
@@ -96,13 +96,18 @@ class HTTP::WebSocket
     @ws.close(message)
   end

-  def run
+  record Ping, message : String
+  record Pong, message : String
+  record Text, message : String
+  record Binary, message : Bytes
+  record Close, message : String
+
+  def receive : Ping | Pong | Text | Binary | Close
     loop do
       begin
         info = @ws.receive(@buffer)
       rescue IO::EOFError
-        @on_close.try &.call("")
-        break
+        return Close.new("")
       end

       case info.opcode
@@ -110,40 +115,60 @@ class HTTP::WebSocket
         @current_message.write @buffer[0, info.size]
         if info.final
           message = @current_message.to_s
-          @on_ping.try &.call(message)
-          pong(message) unless closed?
           @current_message.clear
+          return Ping.new(message)
         end
       when Protocol::Opcode::PONG
         @current_message.write @buffer[0, info.size]
         if info.final
-          @on_pong.try &.call(@current_message.to_s)
+          message = @current_message.to_s
           @current_message.clear
+          return Pong.new(message)
         end
       when Protocol::Opcode::TEXT
         @current_message.write @buffer[0, info.size]
         if info.final
-          @on_message.try &.call(@current_message.to_s)
+          message = @current_message.to_s
           @current_message.clear
+          return Text.new(message)
         end
       when Protocol::Opcode::BINARY
         @current_message.write @buffer[0, info.size]
         if info.final
-          @on_binary.try &.call(@current_message.to_slice)
+          message = @current_message.to_slice
           @current_message.clear
+          return Binary.new(message)
         end
       when Protocol::Opcode::CLOSE
         @current_message.write @buffer[0, info.size]
         if info.final
           message = @current_message.to_s
-          @on_close.try &.call(message)
-          close(message) unless closed?
           @current_message.clear
-          break
+          return Close.new(message)
         end
       end
     end
   end
+
+  def run
+    loop do
+      op = receive
+      case op
+      when Ping
+        @on_ping.try &.call(op.message)
+        pong(op.message) unless closed?
+      when Pong
+        @on_pong.try &.call(op.message)
+      when Text
+        @on_message.try &.call(op.message)
+      when Binary
+        @on_binary.try &.call(op.message)
+      when Close
+        @on_close.try &.call(op.message)
+        break
+      end
+    end
+  end
 end

 require "./web_socket/*"

Then you can do something like:

require "http/web_socket"

ws = HTTP::WebSocket.new("ws://localhost:3000")
ws.send("hello!")
ws.send("well!")

# op is one of HTTP::WebSocket::Ping, HTTP::WebSocket::Pong, HTTP::WebSocket::Text, etc.
while op = ws.receive
  p op
end

The only "problems" I see with this approach are:

Other than that, it would be nice to not be able to mix run with blocking receive. That is, maybe have a HTTP::WebSocket::Listener that has all those on_message methods but doesn't have a receive method.

In any case, I'm not sure what's the advantage of having this blocking behaviour. Websockets are asynchronous by nature. It's not like a socket where you have a protocol where you send stuff and expect to receive stuff immediately (imagine a chatroom, any message can come at any moment). But if someone want to go this direction, PRs are welcome (they can base it on the diff above).

RX14 commented 6 years ago

It's not like a socket where you have a protocol where you send stuff and expect to receive stuff immediately

This also isn't the case with sockets. A websocket is just a normal socket but with atomic "messages". The way we handle their concurrency should be the same.

asterite commented 6 years ago

@RX14 With a socket against HTTP you send a request and immediately get a response. For a socket against a DB, you send a query and get a result. I think that's pretty different to a websocket...

RX14 commented 6 years ago

@asterite there are many synchronous protocols with websockets and there are many asynchronous protocols with sockets.

asterite commented 6 years ago

Well, feel free to send a PR :-)

ghost commented 6 years ago

I see absolutely no problem with using them asynchronously, as is. If you got response you can push that what should be done in all directions from the central placed #on_message, like reactivate a fiber via Channel.

I am just confused about what will change if we got on_message only as a blocking method, as issued above. For me, the actual async behavior works like expected - everything is pretty fine :+1:

So to know it will not become a breaking-change feels better as not to know :)

RX14 commented 6 years ago

If I added an on_read to Socket everyone would complain, that we're not doing CSP and going back to nodejs and callbacks. I don't see why we aren't complaining here.

ghost commented 6 years ago

I am not sure if i understand your meaning correctly all in all, but, one sounds right: i have used this async mostly in nodejs and its callbacks :o So i must understand the CSP way more precise - could be i am wrong on this asynchronously way - this is irritating if all i have done the last years was in the other direction :)

Not easy to figure it out :)

RX14 commented 6 years ago

@BenDietze have you read https://crystal-lang.org/docs/guides/concurrency.html?

asterite commented 6 years ago

@RX14 The original snippet that the original poster wants is:

ws = HTTP::WebSocket.new("ws://echo.websocket.org", "/")

ws.send("hello", "This is a message")
message = ws.receive

But that's wrong! There's no guarantee that ws.receive will get the proper message. It might be a ping. It might be a close. It can be anything, really. So in the end you'll always have to do this:

ws = HTTP::WebSocket.new("ws://echo.websocket.org", "/")

# Send initial message, ok...
ws.send("hello", "This is a message")

while op = ws.receive
  case op
  when HTTP::WebSocket::Text
    # Ok, we got the message
  when HTTP::WebSocket::Ping
    # Need to this, by protocol
    ws.pong
  when HTTP::WebSocket::Close
    # Break
  else
    # Ignore other messages, I guess
  end
end

I can't see how that can be an improvement, in any case, over this:

ws = HTTP::WebSocket.new("ws://echo.websocket.org", "/")

# Send initial message, ok...
ws.send("hello", "This is a message")

ws.on_message do |message|
  # Ok, we got the message
end

# Nothing else needs to be done: ping will be replied with pong, close will exit run

ws.run
RX14 commented 6 years ago

if a socket is closed by the other side when you're reading it returns EOF. Some methods on IO return nil, some raise. So ws.recieve shouldn't return Close just like IO#gets doesn't return a custom struct. Ping should also be handled automatically by a background fiber attached to the websocket. So then ws.recieve either returns a String or a Bytes for text or binary frames. And yeah, that's annoying but in most cases you should know whether your websocket is text or binary. So we can have ws.receive and ws.receive_binary and then an unnamed method which can recieve both. If you want more control than that, you can use WebSocket::Protocol, whose #recieve will behave like you say because it's low-level.

Sija commented 6 years ago

Ping should also be handled automatically by a background fiber attached to the websocket.

There are still valid use cases for handling ws.on_ping manually though (like heartbeat monitor for example).

bararchy commented 6 years ago

Also I think that client should never respond with pong, only server (RFC wise)

straight-shoota commented 6 years ago

@Sija @bararchy These are merely details. It should be fairly easy to have a decent default behaviour and an easy way to customize.

In general, because of the asynchronous nature, applications working with websockets don't have logic expecting a specific sequence of message, but handle any message depending on application state. Then, it doesn't make much difference if you write:

websocket.on_string do |message|
  # ...
end
websocket.on_binary do |message|
  # ...
end

Or:

# simplified example, based on above patch (not necessarily how it should look in the end):
case message = websocket.receive
when String
  # ...
when Binary
  # ...
end

The second approach has a simpler control flow model because there is no concurrent background fiber involved. I'd consider this the more idiomatic Crystal way. (Many people have come here fleeing from callback hell.)

ghost commented 6 years ago

My Intention on this was the following: WebSockets have (in theory) the capabilities to build clusters due to their reduced overhead and persistence. So i just concepted to connect cloudinstances via WebSocket to an cluster; some machines would have morely such jobs, others have other jobs and all together they are one.

Starting this, i have figured here an pseudo-server (for covering data / manipulating / whatever).

This would be done like

require "kemal"

Kemal.config.port = 1024

sockets = [] of HTTP::WebSocket

ws "/" do |socket|
  sockets << socket

  socket.on_message do |message|
    sockets.each do |socket|
      socket.send(message)
    end
  end

  socket.on_close do
    sockets.delete socket
  end
end

Kemal.run

It serves as ws-server and here it simply returns the incoming messages.

Another one was more the frontend of this, capable of GET and POST Requests and respond to a browser - capable of using a WebSocket to the above one and capable of HTTP (here done with kemal):

require "http/web_socket"
require "kemal"

channels = {} of String => Channel(String)

socket = HTTP::WebSocket.new(URI.parse("ws://127.0.0.1:1024"))

socket.on_message do |m|
  m = JSON.parse m
  channels[m["fiber"].as_s].send m["data"].as_s
end

spawn do
  socket.run
end

spawn do
  get "/" do
    fiber = Fiber.current.to_s
    channels[fiber] = Channel(String).new
    data = {"fiber" => fiber, "data" => "..."} of String => String
    socket.send data.to_json
    # 1
    channels[fiber].receive
    channels.delete fiber
    "SOME RESPONSE"
  end
end

Kemal.run

As you can see, when GET comes in, it will ask the above dataserver for some data; this data returns to socket.on_message do |m| and lastly reactivate the Fiber due to their ´Channel´.

In this construct i can easily connect 10 instances, all together with a handy number of WebSockets, i can do GET, POST, PUT and so on - because my socket.on_message is centralized.

My thoughts were if we would have blocking Sockets per se it would block this Socket already at # 1 - so if the figured data needs a long time - say 1 second - i cannot use this Socket because it already blocks - _because i have only ONE socket.on_message per Socket._ This could not be spawned here. I must build a Pool of WebSockets to do such things, but how big should this end? How cheap is it to handle this dynamically? Makes this a sense? I think not concrete.

So - this was just a proof of concept to test some things; i was not deep enough at this time to read and understand all the source of websocket

What do you think would be the Crystal way of doing this above if WebSockets become such an option?

I think, i have done this wrong, not bad, but lastly wrong :-)

oprypin commented 6 years ago

@BenDietze, I think you have some misunderstanding. Please look at @asterite's code snippet. It shows that this "asynchronous" implementation (run) is just a simple wrapper around a fundamentally blocking¹ implementation (receive). So the resolution of this issue can be simply to expose this receive, which is currently not exposed. run can even stay in. Even if it doesn't stay in, you could simply replace the invocation of run with that code snippet.

¹: Not actually blocking, Crystal uses fibers for concurrent operation.

ghost commented 6 years ago

Thanks @oprypin @asterite @RX14 - to all, please misunderstood me - and yes, i try to understand this big snippet and english as good as possible next days :-)

I am hanging at the problem of ws.receive when i have 1000 GET or POST requests because i cannot overwrite on_message every time in every request because this is the on_message method of this Socket. When i do this like my code above i have one socket.on_message, but with 1000 GET requests i cannot overwrite it thousand times if it becomes the above mentioned blocked version? This was my thought :-)

I will take a look tomorrow, its time to go :-)

straight-shoota commented 6 years ago

@BenDietze Maybe it's better to discuss your specific use case on gitter. Conversation in chat can be helpful to reduce misunderstanding. Feel free to ping me (I can also help you in German if you have problems with the language).

RX14 commented 6 years ago
require "kemal"

Kemal.config.port = 1024

sockets = [] of HTTP::WebSocket

ws "/" do |socket|
  sockets << socket

  while message = socket.receive
    sockets.each do |socket|
      socket.send(message)
    end
  end
ensure
  sockets.delete socket
end

Kemal.run
zenithlight commented 5 years ago

Hey, I'm interested in getting this implemented. I'd love some input on my draft PR #8024.