ReactiveX / RxGo

Reactive Extensions for the Go language.
MIT License
4.9k stars 338 forks source link

RxGo Rest api example #405

Open DennisMuchiri opened 7 months ago

DennisMuchiri commented 7 months ago

I'm requesting an example on how to use this library to emit items in a Rest API GET

Alfex4936 commented 2 months ago

I think simplest api would be something like this

image

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"

    "github.com/reactivex/rxgo/v2"
)

type WebFramework struct {
    router map[string]rxgo.Observable
}

func NewWebFramework() *WebFramework {
    return &WebFramework{
        router: make(map[string]rxgo.Observable),
    }
}

// Observable handler
func (wf *WebFramework) Handle(path string, handler rxgo.Observable) {
    wf.router[path] = handler
}

func (wf *WebFramework) Start(port int) error {
    log.Printf("💕 Starting server at %d", port)
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        if observable, ok := wf.router[r.URL.Path]; ok {
            ch := observable.Observe()
            for item := range ch {
                if item.Error() {
                    http.Error(w, item.E.Error(), http.StatusInternalServerError)
                    return
                }
                fmt.Fprintln(w, item.V)
            }
        } else {
            http.NotFound(w, r)
        }
    })

    return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func main() {
    wf := NewWebFramework()

    // an observable that emits integers, transforms them to strings, and filters by a condition
    observable := rxgo.Just(1, 2, 3, 4, 5)().
        Map(func(_ context.Context, item interface{}) (interface{}, error) {
            num := item.(int)
            return fmt.Sprintf("%d", num*num), nil
        }).
        Filter(func(item interface{}) bool {
            str := item.(string)
            return str[0] == '1' // simple filter to demonstrate, checks if the first char is '1'
        })

    // Register an HTTP path with the observable
    wf.Handle("/data", observable)

    // Start the server on port 8080
    err := wf.Start(8080)
    if err != nil {
        fmt.Println("Failed to start server:", err)
    }
}