bob-cd / bob

This is what CI/CD should've been.
https://bob-cd.github.io
MIT License
218 stars 16 forks source link

Event streaming from api-server #119

Closed TimoKramer closed 1 year ago

TimoKramer commented 1 year ago

For a dynamic web interface for Bob we need it to update based on some kind of event stream. The event stream should be provided by the apiserver since that is the point we are connecting to from outside. Updates on the status of pipelines should be streamed to all the consumers that are connected to the events-endpoint.

We could utilize server-sent-events for this or websockets. sse is the more lightweight approach and provides one-way communication what is what we need. There is a ring-middleware for it that seems to be all we need. websockets are also a widely used option and seem to be an adequate option as well.

We could also make the /events-endpoint static and dynamic. For streaming events the consumer could send a GET-request with parameter like this ?stream=true. For manual requests we could offer some functionality like docker offers with its events-endpoint, namely since until and filter.

lispyclouds commented 1 year ago

Looks good! The one thing I'd consider is ease of access from the command line tooling as well apart from the Web UI.

TimoKramer commented 1 year ago

Ok, I took a look at xtdb's listen and it didn't look like that can stream :status changes for us. So basically we would have to implement our own events.

TimoKramer commented 1 year ago

Or we could use mulog?!

aldosolorzano commented 1 year ago

I found https://mercure.rocks/ as an interesting thing for this problem

TimoKramer commented 1 year ago
Apiserver needs to consume events for an events-endpoint static request with no updates use postgres use rabbitmq streams
deliver state xt/q next/execute! environment.consumerBuilder()
event streaming query repeatedly client-side LISTEN events environment.consumerBuilder()
consistency always consistent there might be lost events between SELECT and LISTEN should be consistent
effort lowest effort configure, connect and read/write postgres; problem when xtdb-backend is switched :worried: java interop with streams-client; install streams-plugin :neutral_face:
performance expectation bad since whole datasets are queried repeatedly supposedly good because PG :relaxed: supposedly best because that's what it's made for :smiley:
quirks none, but not really a solution :expressionless: FIFO queueing and LISTEN are more esoteric PG features :expressionless: seems to be the most straightforward solution
lispyclouds commented 1 year ago

Another thing to keep in mind is that we should start subscribing to stream when the request comes into /events. We should not subscribe before as that would start accumulation on the api servers making them stateful. Also we should check if RabbitMQ supports consumer groups. All of the APIServers form a single consumer and the read state should be treated like that.

coro commented 1 year ago

Rabbit's equivalent of consumer groups is the Single Active Consumer pattern for streams. If I understand your use case correctly, you'd want all of your APIServer consumers to have this enabled so that only one of them consumes from the stream at a time.

lispyclouds commented 1 year ago

@TimoKramer Just to kick the tires, tried this piece of code:

(ns rabbit.main
  (:import
   [com.rabbitmq.stream ConfirmationHandler Environment MessageHandler]))

(def stream "my-events")

(def environment (.. Environment builder build))

(def producer (.. environment
                  producerBuilder
                  (stream stream)
                  (name "a-producer")
                  build))

(def consumer (.. environment
                  consumerBuilder
                  (stream stream)
                  (name "a-consumer")
                  singleActiveConsumer
                  (messageHandler (reify MessageHandler
                                    (handle [_ context message]
                                      (println "This is a context:" context)
                                      (println "This is a message:" (String. (.getBodyAsBinary message))))))
                  build))

(defn publish
  [producer ^String message]
  (let [message (.. producer
                    messageBuilder
                    (addData (.getBytes message))
                    build)]
    (.send producer message (reify ConfirmationHandler
                              (handle [_ _])))))

(comment
  (set! *warn-on-reflection* true)

  ;; create the stream
  (.. environment
      streamCreator
      (stream stream)
      create)

  (publish producer "bar")

  (publish producer "foo")

  (.close producer)

  (.close consumer))

seems to do quite well and should serve the needs. Leaving this as an example. Used this to start a rabbit instance for the test:

$ podman run -it --name rmq --rm -p 5552:5552 -p 5672:5672 -p 15672:15672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' rabbitmq:management-alpine

$ podman exec rmq rabbitmq-plugins enable rabbitmq_stream
TimoKramer commented 1 year ago

Rabbit's equivalent of consumer groups is the Single Active Consumer pattern for streams. If I understand your use case correctly, you'd want all of your APIServer consumers to have this enabled so that only one of them consumes from the stream at a time.

@coro I need all the apiservers to subscribe to a stream and all should receive all messages. Single active consumer seems to only send to the 'active' one and seems to be not the right fit as I see it. Will read the docs more closely now. Thanks! 👍

lispyclouds commented 1 year ago

On further thinking I think we should NOT use the single active consumer:

Will think more, lemme know what y'all think of this?

lispyclouds commented 1 year ago

Here are two ring handlers that implements a producer and consumer and sends SSE:

(require '[ring.core.protocols :as p])

(defn publish
  [{{{:keys [message]} :path} :parameters}]
  (let [msg (.. producer
                messageBuilder
                (addData (.getBytes message))
                build)]
    (.send producer msg (reify ConfirmationHandler
                          (handle [_ _])))
    {:status 200
     :body {:message "Ok"}}))

(defn events
  [_]
  {:status 200
   :headers {"content-type" "text/event-stream"
             "transfer-encoding" "chunked"}
   :body (reify p/StreamableResponseBody
           (write-body-to-stream [_ _ output-stream]
             (with-open [w (io/writer output-stream)]
               (let [complete (promise)
                     consumer (.. environment
                                  consumerBuilder
                                  (stream stream-name)
                                  (messageHandler (reify MessageHandler
                                                    (handle [_ _ message]
                                                      (try
                                                        (doto w
                                                          ;; SSE format: data: foo\n\n
                                                          (.write (str "data: " (String. (.getBodyAsBinary message)) "\n\n"))
                                                          (.flush))
                                                        (catch Exception _
                                                          (println "client disconnected")
                                                          (deliver complete :done)))))) ;; unblock
                                  build)]
                 @complete ;; block til done
                 (.close consumer)
                 (.close output-stream)))))})

the first handler handles: POST /publish/foo the second one handles GET /events. eg: curl http://localhost:7777/events

One or more clients can make the GET /events call and they all should start seeing the events from the beginning.

The consumer implementation is a bit convoluted with the way Jetty/Ring streams work, open to better ideas! The weird promise is there to cleanup the consumer. Couldn't find a way to check if the stream is closed to know when the client has disconnected. Catching the exception when the write fails and signalling seems to be the best idea i can think of now.

lispyclouds commented 1 year ago

Another observation: most browsers would limit the total concurrent persistent HTTP connections to the same domain to some low number. 6 for HTTP 1.1 and 100 for HTTP 2/3 afaik. We can keep this in mind if this starts being a problem with things like multiple tabs. This isn't an issue in non browser clients.

TimoKramer commented 1 year ago

Rebuilt the PR with rabbitmq stream and SSE.

lispyclouds commented 1 year ago

implemented in https://github.com/bob-cd/bob/commit/efddf352b7101abf520b643c6d61aa8bfa92f955