taoensso / sente

Realtime web comms library for Clojure/Script
https://www.taoensso.com/sente
Eclipse Public License 1.0
1.74k stars 193 forks source link

Segment message into chunks - optional "onProgress" callback/event #178

Closed Frozenlock closed 8 years ago

Frozenlock commented 8 years ago

This has two purposes.


Firstly, like #117, I can't send anything bigger than a few hundreds kb. http-kit will fail catastrophically and sente will lose the connection.

In this case I know this is an http-kit bug. (It appears to have some PRs to solve this issue, but the project looks somewhat abandoned...)

Anyhow, segmenting the messages into smaller chunks would allow to bypass this limitation.


Secondly, if the messages are segmented into chunks, it gives the opportunity to run an optional callback or launch an event at every chunk. One could then use this to closely follow and control the state of longer communications.

Here is an example callback function:


(fn chunk-completed-callback
  [{:keys [completed-chunk total-chunks]}] ;; completed-chunk: 10, total-chunks: 35
  (if (= @instruction :stop)
    :drop-event ;; returning a certain keyword could cancel the download/upload
    (swap! download-state
           assoc
           :done completed-chunk))) ;; let the user know how far in the process he is.
danielcompton commented 8 years ago

This would increase the complexity of both client and server side Sente code. It's certainly one solution, but I wonder if there is a more elegant way of achieving this? Fixing the bug in httpkit would be another way.

In your example callback function, is that running on the client or server?

Not saying no, just opening a discussion :)

Frozenlock commented 8 years ago

Fixing the bug in http-kit should be done, but that still leaves the second feature lacking (chunks callback).

The example is client-side. (with some Reagent hints... download-state) Tho it would be nice to have it server-side too.

The other possibility I see is to wrap the sending function and make a receiving event, which will re-dispatch once the message is reconstructed. The downside of this is that each user needs to re-implement it. The packing/unpacking will also have to happen twice. (The user needs to pack before the message can be segmented.)

Frozenlock commented 8 years ago

Note: http-kit installed locally from the cloned github repo will not have the previously mentioned error. Maybe @ptaoussanis could push the new version to clojars, if he has the right to do so.

Back to the segmentation feature.

I've made a little prototype wrapper to send segmented message via sente. The sending function is client side (cljs) and the receiving method is for the server (clj).

The receiving atom could certainly benefit from some kind of timeout, but it's good enough for a demo. Here's the code:


(ns cea.sente.segments
  (:require [taoensso.encore :as enc]
            [taoensso.sente     :as sente]
            #?(:cljs [cljs.core.async :as async :refer [<! >! chan put! timeout]])
            [cea.sente :as s :refer [event-msg-handler]])
  #?(:cljs
       (:require-macros [devcards.core :as dc :refer [defcard-rg]]
                        [cljs.core.async.macros :refer [go]]
                        [taoensso.timbre :refer [debugf]])))

(let [segments-store (atom {})]

  #?(:cljs
     (defn chsk-send-segmented!
       " `segment-cb' if a function that will be called after every segment sent.
        It should expect a map with the follwing keys:
        - :edn-reply :: the edn-reply sent by the server
        - :s-n :: The segment number
        - :s-v :: the segment value (encoded, mostly for debug)
        - :s-t :: the total number of segments
        - :msg-id :: a UUID for this collection of segmented messages
        - :cancel-fn :: a function that will stop the transfer of segments.

       The receiving end of segmented messages should expect these
       keys in the ?data field:
       - :s-n, :s-v, :s-t, :msg-id and :action.
       :action is sent when a particular action is required on the
       receiving end (such as :end or :cancel)."
       ([event timeout] (chsk-send-segmented! event nil))
       ([event timeout reply-fn] (chsk-send-segmented! event timeout reply-fn nil))
       ([event timeout reply-fn segment-cb]
        (let [msg-id (enc/uuid-str)
              segments (->> (enc/pr-edn event)
                            (partition-all 1000) ;; 1000 characters per segment
                            (map #(apply str %))
                            (map-indexed vector))
              total-n (dec (count segments))
              cancel? (atom nil)
              c (chan)]
          (go
            (doseq [[s-n s-value] segments
                    :let [message-map {:msg-id msg-id :s-v s-value
                                       :s-t total-n   :s-n s-n}]
                    :while (not @cancel?)]
              ;; send a segment to the other party
              (s/chsk-send! [:chsk/segmented message-map]
                            timeout
                            (fn [edn-reply]
                              (if (sente/cb-error? edn-reply)
                                (reset! cancel? true))
                              (put! c edn-reply)))

              (when segment-cb ;; segment callback
                (segment-cb (assoc message-map
                                   :canceled? @cancel?
                                   :cancel-fn #(reset! cancel? true)
                                   :edn-reply (<! c)))))

            ;; all the segments have been sent (or we canceled).
            (s/chsk-send! [:chsk/segmented
                           {:msg-id msg-id :action (if @cancel? :cancel :end)}]
                          timeout
                          reply-fn))))))

  (defmethod event-msg-handler :chsk/segmented
    [{:as ev-msg :keys [event id ?data ?reply-fn]}]
    (let [{:keys [msg-id s-v s-n s-t action]} ?data]
      (case action
        :cancel (swap! segments-store dissoc msg-id)
        :end (do ;(println (get-in @segments-store [msg-id :s-v]))
                 (let [result (->> (get-in @segments-store [msg-id :s-v])
                                   (enc/read-edn))
                       id (first result)
                       event result
                       ?data (last result)]
                   (swap! segments-store dissoc msg-id)
                   (event-msg-handler (assoc ev-msg
                                             :id id
                                             :event event
                                             :?data ?data))))
        ;; default
        (do (swap! segments-store update-in [msg-id :s-v] str s-v)
            (when ?reply-fn
              (?reply-fn {})))))))

#?(:cljs
   (do
     (defn echo []
       [:div [:button.btn.btn-default
              {:on-click #(s/chsk-send!
                           [:chsk/echo {:msg "hello"}] 3000
                           (fn [edn-reply]
                             (js/console.log (str edn-reply))))}
              "Echo"]])

     (defn segmented-echo []
       [:div [:button.btn.btn-default
              {:on-click #(chsk-send-segmented!
                           [:chsk/echo {:msg (take 100 (cycle ["This is a test with more than 10 characters"]))}]
                           3000
                           (fn [edn-reply]
                             (js/console.log (str edn-reply))))}
              "Echo"]])

     (defcard-rg send-segments
       [echo])

     (defcard-rg send-segmented-segments
       [segmented-echo])))
ptaoussanis commented 8 years ago

Hey there, sorry for the delay responding - have been travelling.

Will try look into this w/in the next few days + get back to you!

ptaoussanis commented 8 years ago

Hi there,

Firstly, like #117, I can't send anything bigger than a few hundreds kb.

Could you possibly try again using [com.taoensso.forks/http-kit "2.1.20"]?

Secondly, if the messages are segmented into chunks, it gives the opportunity to run an optional callback or launch an event at every chunk. One could then use this to closely follow and control the state of longer communications.

Re: adding this as a feature (vs just working around the http-kit bug)- I'm not sure I see the utility to be honest (at least relative to the complexity cost).

Generally speaking- any data transfers large enough to benefit from chunking should probably be made via Ajax anyway (to benefit from HTTP caching, etc.). Sente's ideal data transfers are usually pretty small.

To clarify: you can still benefit from Sente's features as a coordination mechanism for larger transfers. E.g. the server does an async push to the client to signal the client to initiate an Ajax request for a large payload, etc.

HTTP's well designed for large transfers like this and starting to try to reimplement things like HTTP caching (or chunking) over WebSockets sounds to me like a bit of a bad (and unnecessary) path to start heading down.

Sente works great together with standard HTTP features; you can use each for its strengths. Does that make sense / address your use case?

Frozenlock commented 8 years ago

Generally speaking- any data transfers large enough to benefit from chunking should probably be made via Ajax anyway (to benefit from HTTP caching, etc.).

In this case it was an upload. I'm not sure there's much benefit in HTTP for this.

Sente works great together with standard HTTP features; you can use each for its strengths. Does that make sense / address your use case?

Yes. I was hoping to use a single endpoint (Sente's), but all in all in looks like adding a POST will be easier to maintain.

ptaoussanis commented 8 years ago

I'm not sure there's much benefit in HTTP for this.

Well, HTTP uploads will get you fully-baked upload progress for one :-) May also be more reliable.

Yes. I was hoping to use a single endpoint (Sente's), but all in all in looks like adding a POST will be easier to maintain.

Just to clarify: the updated http-kit should work for your large uploads now if you prefer, just minus the HTTP-style progress.