oliyh / oxbow

A Server Sent Events (SSE) client for Clojurescript based on js/fetch
30 stars 3 forks source link

on-error default opt not used #1

Closed martinklepsch closed 11 months ago

martinklepsch commented 2 years ago

This destructures on-error from opts but opts isn't rebound after the merge.

https://github.com/oliyh/oxbow/blob/054ba77099c2e4cc543a79ec69f397cce6b1775b/src/oxbow/core.cljs#L62-L68

PS: Hi, I'm playing around with this library as it seems to be a good way to work with SSE endpoints behind header based authentication. I might open some more issues/PRs if once I have it all figured out but in the meantime, thanks for making it! 🙂

martinklepsch commented 2 years ago

I ended up going on a bit of an educational yakshave trying to figure out why things weren't quite working for me and ended up rewriting oxbow using missionary and using stateful transducers to not have to deal with the buffer state.

One of the main issues I ran into was that the chunks were not complete (i.e. trailing \n\n was not there yet) but library already tried parsing the event.

Below is my reimplementation, I think it should be mostly API compatible and if you're interested I might try to open a PR. Note that it does add a dependency, which may not be what you want but felt empowering in my situation.

I also dropped the reconnect since I felt like it could easily be handled at the call site.

(ns oxbow.core
  (:require [clojure.string :as str]
            [missionary.core :as m]]))

(defn parse-single-event
  [event data-parser]
  (reduce (fn [acc kv]
            (let [[_ k v] (re-find #"(\w*):\s?(.*)" kv)
                  k (when-not (str/blank? k) (keyword (str/trim k)))]
              (if k
                (assoc acc k (when-not (str/blank? v)
                               (if (= :data k)
                                 (data-parser v)
                                 v)))
                (throw (ex-info "Something is off" {:event event})))))
          {}
          (str/split-lines (str/trim event))))

(defn read-chunks
  "Read from `reader` and parse chunks into SSE events (string form)"
  [reader]
  (->> (m/ap
        (let [decoder (js/TextDecoder.)]
          (loop []
            (let [result (m/? (fn [s f] (.then (.read reader) s f) #()))]
              (if (.-done result)
                (do (.cancel reader) ; not sure if this works
                    {:done? (.-done result)})
                (m/amb>
                 {:chunk (.decode decoder (.-value result))}
                 (recur)))))))
       (m/eduction (comp
                    (take-while (comp not :done?))
                    (map :chunk)
                    ;; below handles/normalizes two cases:
                    ;; 1. the chunk does not contain a \n\n
                    ;; 2. the chunk contains multiple \n\n
                    (mapcat #(if (str/includes? % "\n\n") [% :split] [%]))
                    (mapcat #(if-not (= :split %)
                               (interpose :split (str/split % #"\n\n"))
                               [%]))
                    (partition-by (partial = :split))
                    (remove #{[:split]})
                    (map #(str/join %))))))

(def default-opts
  {:on-open  #(js/console.log "Stream connected" %)
   :on-close #(js/console.log "Stream ended")
   :on-event #(js/console.log "Message received: " %)
   :on-error #(js/console.warn "Error: " %)
   :data-parser identity
   :auto-reconnect? true
   :reconnect-timeout 2000})

(defn sse-client
  [opts]
  (let [abort-state (or (::abort-state opts)
                        (let [controller (js/AbortController.)
                              signal (.-signal controller)]
                          (atom {:controller controller
                                 :signal signal
                                 :aborted? false})))
        {:keys [uri fetch-options on-open on-event on-error on-close data-parser]} (merge default-opts opts)
        on-error (fn [e]
                   (when (and on-error (not (:aborted? @abort-state)))
                     (on-error e)))
        abort-fetch! #(do (js/console.log "Aborting connection to" uri)
                          (swap! abort-state assoc :aborted? true)
                          (.abort (:controller @abort-state)))]
    (-> (js/fetch uri (clj->js (assoc fetch-options :signal (:signal @abort-state))))
        (.then (fn [response]
                 (when on-open (on-open response))
                 ((->> (read-chunks (.. response -body getReader))
                       (m/eduction (map #(parse-single-event % data-parser)))
                       (m/reduce #(on-event %2)))
                  (fn done
                    []
                    ;; when the flow above reaches an end (i.e. done)
                    ;; this function is called
                    (when on-close (on-close)))
                  (fn err
                    [e]
                    ;; when the task created here gets cancelled by its consumer
                    ;; this function is called
                    (when on-error (on-error e))
                    (abort-fetch!)))))
        (.catch (fn [e]
                  (when on-error (on-error e))
                  (when on-close (on-close)))))))
oliyh commented 2 years ago

Hello,

To be honest this library was thrown together rather hastily so I'm not surprised there is the odd mistake, thanks for reporting!

Taking the on-error case yes I agree, happy to accept a PR or I can fix it soon. I see you've opened a PR for the rewrite so will discuss in that thread.

Cheers

oliyh commented 2 years ago

Regarding SSE with headers, this is exactly why I needed to make this library!

oliyh commented 11 months ago

The original issue was fixed in b0f004fc0459bdf76cbacf71a8def79b297b783b and the other issues were solved by #3 so am closing this one, thanks @martinklepsch