numaproj / numaflow-python

Numaflow Python SDK
Apache License 2.0
53 stars 18 forks source link

feat: udf reduce #40

Closed jy4096 closed 1 year ago

jy4096 commented 1 year ago

udf reduce

Test

Client

Use numaflow-go sdk client code as the client

package server

import (
    "context"
    "fmt"
    "testing"
    "time"

    fucntionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
    "github.com/numaproj/numaflow-go/pkg/function"
    "github.com/numaproj/numaflow-go/pkg/function/client"
    "github.com/stretchr/testify/assert"
    grpcmd "google.golang.org/grpc/metadata"
    "google.golang.org/protobuf/types/known/timestamppb"
)

func Test_server_reduce(t *testing.T) {
    var ctx = context.Background()
    c, err := client.New(client.WithSockAddr("/tmp/numaflow-test.sock"))
    assert.NoError(t, err)
    defer func() {
        err = c.CloseConn(ctx)
        assert.NoError(t, err)
    }()
    testDatum := &fucntionpb.Datum{
        Key:       "test_id",
        Value:     []byte(`message_success`),
        EventTime: &fucntionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
        Watermark: &fucntionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
    }
    ctx = grpcmd.NewOutgoingContext(ctx, grpcmd.New(map[string]string{function.DatumKey: "client", function.WinEndTime: "1662998460", function.WinStartTime: "1662998400"}))
    reduceCh := make(chan *fucntionpb.Datum, 10)
    for i := 0; i < 10; i++ {
        reduceCh <- testDatum
    }
    close(reduceCh)
    response, err := c.ReduceFn(ctx, reduceCh)
    fmt.Println(response, err)
}

Server

Use numaflow-python sdk function example counter code as the server

from typing import Iterator
from pynumaflow.function import Messages, Message, Datum, Metadata, UserDefinedFunctionServicer

def reduce_handler(key: str, datums: Iterator[Datum], md: Metadata) -> Messages:
    interval_window = md.interval_window
    counter = 0
    for datum in datums:
        print("User reduce got", key, datum.value)
        counter = counter + 1
    msg = "counter:%s interval_window_start:%s interval_window_end:%s" % (
        counter,
        interval_window.start,
        interval_window.end,
    )
    messages = Messages()
    messages.append(Message.to_vtx(key, str.encode(msg)))
    return messages

if __name__ == "__main__":
    grpc_server = UserDefinedFunctionServicer(reduce_handler=reduce_handler, sock_path="/tmp/numaflow-test.sock")
    grpc_server.start()

Result

client test output

=== RUN   Test_server_reduce
[key:"client" value:"counter:10 interval_window_start:2022-09-12 16:00:00+00:00 interval_window_end:2022-09-12 16:01:00+00:00"] <nil>
--- PASS: Test_server_reduce (0.00s)
PASS

server reduce udf output

GRPC Server listening on: unix:///tmp/numaflow-test.sock with max threads: 64
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'
User reduce got client b'message_success'

TODO

After this PR is merged, build the counter example image using the new release and upload.

codecov[bot] commented 1 year ago

Codecov Report

Merging #40 (3eb66d9) into main (801bc36) will increase coverage by 0.15%. The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main      #40      +/-   ##
==========================================
+ Coverage   98.38%   98.53%   +0.15%     
==========================================
  Files           9        9              
  Lines         247      273      +26     
  Branches       14       21       +7     
==========================================
+ Hits          243      269      +26     
  Misses          2        2              
  Partials        2        2              
Impacted Files Coverage Δ
pynumaflow/sink/server.py 95.12% <ø> (ø)
pynumaflow/_constants.py 100.00% <100.00%> (ø)
pynumaflow/function/__init__.py 100.00% <100.00%> (ø)
pynumaflow/function/server.py 97.18% <100.00%> (+1.43%) :arrow_up:

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more