taoensso / sente

Realtime web comms library for Clojure/Script
https://www.taoensso.com/sente
Eclipse Public License 1.0
1.73k stars 193 forks source link

[mod] #424 reintroduce jetty9 support #426

Open TimoKramer opened 1 year ago

TimoKramer commented 1 year ago

Hi! With this PR I am proposing a change to reintroduce jetty9 support. I tried my best to understand both the libs sente and jetty9-adapter. The changes are small but two things are not clear to me:

  1. Here @sunng87 says ajax-cbs is not correct but I don't understand what the problem is. Can someone clarify if further changes are necessary?
  2. I had to add commons-io which is excluded from jetty9-adapter. I was getting a CompilerException when it was missing. Is there a problem with adding it?

I need this change to continue my work on Bob-CD.. Thanks for your work on this lib.

TimoKramer commented 1 year ago

I will revert the changes on server.clj example-project to use http-kit when you are happy with this PR.

ptaoussanis commented 1 year ago

@TimoKramer Hi Timo, thanks a lot for this - looks promising!

And nice idea to mod the reference example (at least temporarily), that'll make reviewing this much easier 🙏

Haven't had an opportunity to look in detail yet, but will do before the upcoming v1.18 release.

A couple quick questions in the meantime:

Cheers!

TimoKramer commented 1 year ago

It appears to me that websocket works fine. The odd thing with ajax is that the client is sending out a hell of a lot of requests, though all 200s. sente-ajax-cbs

Here is the compileerror as txt because it is long.

ptaoussanis commented 1 year ago

Quick update to say that I'm taking a look at this now, thanks for the patience Timo 👍

ptaoussanis commented 1 year ago

@TimoKramer Hi Timo, thanks again for all your work on this.

To update from my side:

Current status of Ajax mode:

I unfortunately won't be able to properly debug this myself, and I'm really not familiar with Jetty or its adapter- but I can offer some high-level thoughts:

Since it's relevant to the work you're doing here, I'm going to risk being overly blunt about something that I'm probably insufficiently familiar with: I'm skeptical about the long-term viability of the async API that the Jetty adapter currently offers for HTTP responses.

It's fully in accordance with the official async Ring spec, so it's actually the async Ring spec that I'm skeptical about.

The spec itself isn't bad per se, it's a reasonable design for the problem. But it has two disadvantages that interact:

  1. The proposed API ~necessarily breaks compatibility with sync handlers + middleware.
  2. For very justifiable reasons, the spec took quite some time to become official - and two things happened in the meantime: 2a. Lots of async-incompatible handlers + middleware were written. 2b. Libraries that needed an API sooner ended up coming up with their own divergent solutions (e.g. http-kit, Immutant, Undertow, Aleph, etc.).

The official async Ring API indeed has some advantages that I appreciate. But for better-and-worse, it was an extension to an API originally designed to be sync-first, and it was an extension that came quite late.

The official API would have been great if Ring had been async-first from day 1.

But my honest impression is that in practice it may basically a non-starter at this point. Given the weight of much of the community more or less adopting other approaches (2b), and the tendency for those approaches to be incompatible (1, 2a) - my expectation is that there's a decent chance that its remaining use will fade out in time.

This is of course speculation, and I may very well be missing something obvious - but just sharing my current + fallible impression in the hope that it may be helpful.

To clarify: I'd still be be super happy to get working Jetty9 adapter support back in. And zero judgement on folks working on and/or happily using servers based on the async Ring API. It's certainly good to have a variety of approaches available that offer different tradeoffs 👍

Hope that might be of some help. Cheers

TimoKramer commented 1 year ago

Thanks @ptaoussanis for the valuable insights. You certainly have much more knowledge of this topic. I will try to get to it and find a solution to this problem.

TimoKramer commented 1 year ago

Sorry, I am a little lost. I don't know what I can add as a ring-async-resp-fn. Tried a few things and tried to understand it but it did not really work out.

jahson commented 11 months ago

Hi! I've tried to make it work by using undertow adapter as an example. For me it seems to be working, but I'm not sure I've tested everything.

Ring async handlers are not handled for now, but I think the main change for async handler support would be using respond (ring-async-resp-fn) in JettyServerChanAdapter, like

(deftype JettyServerChanAdapter [adapter-opts]
  i/IServerChanAdapter
  (ring-req->server-ch-resp [_ req callbacks-map]
    (let [respond (or (:ring-async-resp-fn callbacks-map) identity)
      (if (jetty9.websocket/ws-upgrade-request? req)
        (ws-ch callbacks-map) ; Not sure if we need to use `respond` here
        (respond (ring-response/response (ajax-ch callbacks-map adapter-opts)))))))

Also we should, probably, wrap writing of body into future, see extend-protocol ring-protocols/StreamableResponseBody SenteJettyAjaxChannel below.

Adapter code:

(ns taoensso.sente.server-adapters.jetty9
  "Sente adapter for ring-jetty9-adapter,
  (https://github.com/sunng87/ring-jetty9-adapter)."
  {:author "Thomas Crowley (@wavejumper), Jian Zhang (@surferHalo)"}
  (:require [clojure.java.io :as io]
            [ring.adapter.jetty9.websocket :as jetty9.websocket]
            [ring.core.protocols :as ring-protocols]
            [ring.util.response :as ring-response]
            [taoensso.sente.interfaces :as i]))

;; WebSocket
(extend-type org.eclipse.jetty.websocket.api.WebSocketAdapter
  i/IServerChan
  (sch-open?  [sch] (jetty9.websocket/connected? sch))
  (sch-close! [sch] (jetty9.websocket/close!     sch))
  (sch-send!  [sch _websocket? msg] (jetty9.websocket/send! sch msg)))

(defn- ws-ch
  [{:keys [on-open on-close on-msg on-error]}]
  (jetty9.websocket/ws-upgrade-response
   {:on-connect          (fn [sch] (on-open  sch true))
    :on-text             (fn [sch msg] (on-msg   sch true msg))
    :on-error            (fn [sch error] (on-error sch true error))
    :on-close            (fn [sch status _] (on-close sch true status))}))

;; Ajax
(defprotocol ISenteJettyAjaxChannel
  (ajax-read! [sch]))

(deftype SenteJettyAjaxChannel [resp-promise_ open?_ on-close adapter-opts]
  i/IServerChan
  (sch-send!  [sch _websocket? msg] (deliver resp-promise_ msg) (i/sch-close! sch))
  (sch-open?  [_sch] @open?_)
  (sch-close! [sch]
    (when (compare-and-set! open?_ true false)
      (deliver resp-promise_ nil)
      (when on-close (on-close sch false nil))
      true))

  ISenteJettyAjaxChannel
  (ajax-read! [_sch]
    (let [{:keys [ajax-resp-timeout-ms ajax-resp-timeout-val]}
          adapter-opts]

      (if ajax-resp-timeout-ms
        (deref resp-promise_ ajax-resp-timeout-ms ajax-resp-timeout-val)
        (deref resp-promise_)))))

(defn- ajax-ch [{:keys [on-open on-close]} adapter-opts]
  (let [open?_ (atom true)
        sch (SenteJettyAjaxChannel. (promise) open?_ on-close adapter-opts)]

    (when on-open (on-open sch false))
    sch))

(extend-protocol ring-protocols/StreamableResponseBody
  SenteJettyAjaxChannel
  (write-body-to-stream [body _response ^java.io.OutputStream output-stream]
    (let [writer (io/writer output-stream)]
      ;; FIXME: we should, probably, wrap `try` with `future`, because `output-stream` might block thread
      ;; See https://github.com/ring-clojure/ring/issues/254#issuecomment-236048380
      (try
        (.write writer (ajax-read! body))
        (.flush writer)
        (finally
          (.close writer))))))

(deftype JettyServerChanAdapter [adapter-opts]
  i/IServerChanAdapter
  (ring-req->server-ch-resp [_ req callbacks-map]
    (if (jetty9.websocket/ws-upgrade-request? req)
      (ws-ch callbacks-map)
      (ring-response/response (ajax-ch callbacks-map adapter-opts)))))

(defn get-sch-adapter
  "Returns an Jetty ServerChanAdapter. Options:
     :ajax-resp-timeout-ms  ; Max msecs to wait for Ajax responses (default 60 secs)
     :ajax-resp-timeout-val ; Value returned in case of above timeout
                            ; (default `:undertow/ajax-resp-timeout`)"
  ([] (get-sch-adapter nil))
  ([{:as   opts
     :keys [ajax-resp-timeout-ms
            ajax-resp-timeout-val]

     :or   {ajax-resp-timeout-ms (* 60 1000)
            ajax-resp-timeout-val :undertow/ajax-resp-timeout}}]

   (JettyServerChanAdapter.
    (assoc opts
           :ajax-resp-timeout-ms  ajax-resp-timeout-ms
           :ajax-resp-timeout-val ajax-resp-timeout-val))))

I can make it a separate PR if you want.

ptaoussanis commented 11 months ago

@jahson Hi Oleg, thanks for the effort on this!

Just to confirm before I take a closer look: have you tried your adapter with the reference example and checked that both WebSocket and AJAX connections appear to work for at least basic functionality (client->server chsk-send, server->client broadcast, etc.).?

Ring async handlers are not handled for now

That's okay, none of the other adapters have specific support for this - the previous discussion touched on Ring async handlers only to the extent that it seemed like they might be a hard requirement to get AJAX long-polling working with ring-jetty9-adapter.

jahson commented 11 months ago

@ptaoussanis Yeah, I've tried it with a reference example. The only thing that I've missed is that broadcast looks not working for ws

jahson commented 11 months ago

Although ws connects successfully.

jahson commented 11 months ago

After adding debug logs I've found that sch-send! is not being called for any broadcast events. The main difference with all other handlers is that broadcast handlers use chsk-send!, whereas other handlers use ?reply-fn. Probably it is something with buffering?

alexandergunnarson commented 1 month ago

Here's a working Jetty 11 adapter we use in production. Happy to PR it.

(ns taoensso.sente.server-adapters.jetty
  "Sente adapter for `ring-jetty-adapter`
   (https://github.com/ring-clojure/ring/tree/master/ring-jetty-adapter).

   Adapted from https://github.com/taoensso/sente/pull/426#issuecomment-1647231979."
  (:require
    [ring.core.protocols :as ring-protocols]
    [ring.util.response :as ring-response]
    [ring.websocket :as ws]
    [ring.websocket.protocols :as ws.protocols]
    [taoensso.sente.interfaces :as i]
    [taoensso.timbre :as log])
  (:import
    [org.eclipse.jetty.websocket.api WebSocketListener]
    [ring.websocket.protocols Socket]))

;; ===== WebSocket ===== ;;

(extend-type WebSocketListener
  i/IServerChan
    (sch-open? [sch] (.isOpen sch))
    (sch-close! [sch] (.close sch))
    (sch-send! [sch-listener _websocket? msg]
      (ws.protocols/-send sch-listener msg)
      true))

(extend-type Socket
  i/IServerChan
    (sch-open? [sch] (ws.protocols/-open? sch))
    (sch-close! [sch] (ws.protocols/-close sch nil nil))
    (sch-send! [sch-socket _websocket? msg]
      (ws.protocols/-send sch-socket msg)
      true))

(defn- respond-ws
  [{:keys [websocket-subprotocols]}
   {:keys [on-close on-error on-msg on-open]}]
  {:ring.websocket/listener (reify
                              ws.protocols/Listener
                                (on-close [_ sch status _] (on-close sch true status))
                                (on-error [_ sch error] (on-error sch true error))
                                (on-message [_ sch msg] (on-msg sch true msg))
                                (on-open [_ sch] (on-open sch true))
                                (on-pong [_ sch data]))
   :ring.websocket/protocol (first websocket-subprotocols)})

;; ===== AJAX ===== ;;

(defprotocol ISenteJettyAjaxChannel
  (ajax-read! [sch]))

(deftype SenteJettyAjaxChannel [resp-promise_ open?_ on-close adapter-opts]
  i/IServerChan
    (sch-send! [sch _websocket? msg]
      (deliver resp-promise_ msg)
      (i/sch-close! sch))
    (sch-open? [_sch]
      @open?_)
    (sch-close! [sch]
      (when (compare-and-set! open?_ true false)
        (deliver resp-promise_ nil)
        (when on-close
          (on-close sch false nil))
        true))

  ISenteJettyAjaxChannel
    (ajax-read! [_sch]
      (let [{:keys [ajax-resp-timeout-ms ajax-resp-timeout-val]} adapter-opts]
        (if ajax-resp-timeout-ms
            (deref resp-promise_ ajax-resp-timeout-ms ajax-resp-timeout-val)
            (deref resp-promise_)))))

(defn- ajax-ch
  [{:keys [on-open on-close]} adapter-opts]
  (let [open?_ (atom true)
        sch    (SenteJettyAjaxChannel. (promise) open?_ on-close adapter-opts)]
    (when on-open
      (on-open sch false))
    sch))

(extend-protocol ring-protocols/StreamableResponseBody
  SenteJettyAjaxChannel
    (write-body-to-stream [body _response ^java.io.OutputStream output-stream]
      ;; NOTE We could consider wrapping `try` with e.g. `future`, because `output-stream` might
      ;; block the thread (https://github.com/ring-clojure/ring/issues/254#issuecomment-236048380)
      (future
        (try
          (.write output-stream (.getBytes ^String (ajax-read! body) "UTF-8"))
          (.flush output-stream)
          (catch Throwable ex
            (log/error ex))
          (finally
            (.close output-stream))))))

;; ===== Overall ===== ;;

(deftype JettyServerChanAdapter [adapter-opts]
  i/IServerChanAdapter
    (ring-req->server-ch-resp [_ request callbacks-map]
      (if (ws/upgrade-request? request)
          (respond-ws request callbacks-map)
          (ring-response/response (ajax-ch callbacks-map adapter-opts)))))

(defn get-sch-adapter
  "Returns an Jetty ServerChanAdapter. Options:
     :ajax-resp-timeout-ms  ; Max msecs to wait for Ajax responses (default 60 secs)
     :ajax-resp-timeout-val ; Value returned in case of above timeout
                            ; (default `:ajax-resp-timeout`)"
  ([] (get-sch-adapter nil))
  ([{:as   opts
     :keys [ajax-resp-timeout-ms
            ajax-resp-timeout-val]
     :or   {ajax-resp-timeout-ms  (* 60 1000)
            ajax-resp-timeout-val :ajax-resp-timeout}}]
   (JettyServerChanAdapter.
     (assoc opts
       :ajax-resp-timeout-ms  ajax-resp-timeout-ms
       :ajax-resp-timeout-val ajax-resp-timeout-val))))
ptaoussanis commented 1 month ago

@alexandergunnarson PR would be great, thank you Alex! 🙏

Just skimmed your implementation, will look closer later. But in the meantime- what happens when :ajax-resp-timeout is returned as a timeout val?

I ask because of this recent discussion and PR. I'm guessing you may have based your implementation on the Undertow adapter?

alexandergunnarson commented 1 month ago

@ptaoussanis — I based it on the implementation in this PR's discussion thread (https://github.com/taoensso/sente/pull/426#issuecomment-1647231979), which yes, is based on the Undertow adapter. It works well for our 10Ks of users — both WebSocket and AJAX — but I'm sure there are things I've missed, including what you pointed out. Let me know what else you find!

To answer your question directly, I have no idea as to :ajax-resp-timeout. Presumably those opts need to be removed?

ptaoussanis commented 1 month ago

@alexandergunnarson I based it on the implementation in this PR's discussion thread

Ah, gotcha! Then I'm guessing that timeout-val opt can be removed, though I'll take a closer look when reviewing the PR.

Thanks again for the work on this, will be nice to have a working Jetty adapter in Sente 👍

alexandergunnarson commented 1 month ago

Likely so. No problem — will make the PR now!

alexandergunnarson commented 1 month ago

PR here — https://github.com/taoensso/sente/pull/447