rsocket / rsocket-go

rsocket-go implementation
Apache License 2.0
511 stars 44 forks source link

Flux switchOnFirst operator support #12

Closed linux-china closed 5 years ago

linux-china commented 5 years ago

for bi-direction channel to trigger first item.

public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)

https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#switchOnFirst-java.util.function.BiFunction-

Java code in RSocket

public interface ResponderRSocket extends RSocket {
  /**
   * Implement this method to peak at the first payload of the incoming request stream without
   * having to subscribe to Publish&lt;Payload&gt; payloads
   *
   * @param payload First payload in the stream - this is the same payload as the first payload in
   *     Publisher&lt;Payload&gt; payloads
   * @param payloads Stream of request payloads.
   * @return Stream of response payloads.
   */
  default Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
    return requestChannel(payloads);
  }
}
jjeffcaii commented 5 years ago

This feature has been added in master branch.

Here are some code snippet:

package flux_test

import (
    "context"
    "fmt"
    "log"
    "strconv"
    "testing"
    "time"

    "github.com/rsocket/rsocket-go/payload"
    "github.com/rsocket/rsocket-go/rx"
    "github.com/rsocket/rsocket-go/rx/flux"
    "github.com/stretchr/testify/assert"
)

func TestSwitchOnFirst(t *testing.T) {
    flux.Create(func(ctx context.Context, s flux.Sink) {
        s.Next(payload.NewString("5", ""))
        for i := 0; i < 10; i++ {
            s.Next(payload.NewString(fmt.Sprintf("%d", i), ""))
        }
        s.Complete()
    }).SwitchOnFirst(func(s flux.Signal, f flux.Flux) flux.Flux {
        v, ok := s.Value()
        if !ok {
            return f
        }
        first, _ := strconv.Atoi(v.DataUTF8())
        return f.Filter(func(input payload.Payload) bool {
            n, _ := strconv.Atoi(input.DataUTF8())
            return n > first
        })
    }).Subscribe(context.Background(), rx.OnNext(func(input payload.Payload) {
        fmt.Println("next:", input.DataUTF8())
    }))
}

Sorry for spending so long time because I made a wrong implementation before and I had to re-implemented the whole rx features.