numaproj / numaflow-python

Numaflow Python SDK
Apache License 2.0
52 stars 18 forks source link

feat: Added servicer for MapStreamBatchFn #163

Closed magelisk closed 3 months ago

magelisk commented 5 months ago

Adds python servicer to address https://github.com/numaproj/numaflow/issues/1688.

The main 'issue' with this implementation is that since there are now two functions the servicer needs to be aware of, the __invoke* function can't rely just on __call__ to proxy, so it does NOT support the pure function-based callback at this time.. I'm not sure how the core team would like to address that.

I will make an examples for this, but my current setup was a bit weird and didn't' immediately build with the same structure, so I'll update that shortly. But below is the code and pipeline use to test, in coordination with updates in https://github.com/numaproj/numaflow-go/pull/129 & https://github.com/numaproj/numaflow/pull/1707

import os
import time
import json
from collections.abc import AsyncIterable
from pynumaflow.mapstreamer import Message, Datum, MapStreamAsyncServer, MapStreamer

SLEEP_TIME = int(os.environ.get("SLEEP_TIME_SEC", "1"))

class MapperStreamer(MapStreamer):
    async def handler(self, keys: list[str], datum: Datum) -> AsyncIterable[Message]:
        pass

    async def handler_stream(self, datum: AsyncIterable[Datum]) -> AsyncIterable[Message]:
        """
        A handler to iterate over each item in stream and will output message for each item.
        For example, indicates even, odd, or DROP if 0.

        This will sleep a very short time to simulate longer processing so that we can see
        messages actually backing up and getting fetched and processed in batches
        """
        all_grouped = []

        # Treat each message individually, because we can

        print(f"Simulate doing work for {SLEEP_TIME} sec")
        time.sleep(SLEEP_TIME)
        async for msg in datum:
            parsed = json.loads(msg.value.decode())
            val = hash(parsed["Data"]["padding"])

            as_str = str(val)
            all_grouped.append(val)
            print(f"Computed message value = {as_str}")

            last_int = int(as_str[-1])
            if last_int == 0:
                print(f"Drop {as_str}")
                yield Message.to_drop()
                continue

            if last_int % 2 == 0:
                output_keys = ["even"]
                output_tags = ["even-tag"]
            else:
                output_keys = ["odd"]
                output_tags = ["odd-tag"]
            yield Message(value=as_str.encode("utf-8"), keys=output_keys, tags=output_tags)

        # Show that we can do a messages separate from each individual one.
        # This demonstrates 'grouping' messages into fewer, but larger ,messages
        grouped_val = json.dumps(all_grouped).encode("utf-8")
        yield Message(value=grouped_val, tags=["grouped"])

if __name__ == "__main__":
    # NOTE: stream handler does currently support function-only handler
    handler = MapperStreamer()
    grpc_server = MapStreamAsyncServer(handler)
    grpc_server.start()

Pipeline.yaml

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline
spec:
  vertices:
    - name: in
      scale:
        disabled: true
      source:
        generator:
          # How many messages to generate in the duration.
          rpu: 10
          duration: 10s
          # Optional, size of each generated message, defaults to 10.
          msgSize: 10
    - name: even-or-odd
      metadata:
        annotations:
          numaflow.numaproj.io/map-stream: "true"
      limits:
        readBatchSize: 20
      scale:
        disabled: true
      # limits:
      #   readBatchSize: 1
      udf:
        container:
          imagePullPolicy: Never
          image: docker.io/warner/demo-app:latest
          command: ["python"]
          args:
          - -m
          - demo_pipeline.stream_wait_udf
          env:
          - name: MAX_THREADS
            value: "1"
          - name: PYTHONUNBUFFERED
            value: "1"
          - name: SLEEP_TIME_SEC
            value: "10"
    - name: even-sink
      scale:
        min: 1
      sink:
        # A simple log printing sink
        log: {}
    - name: odd-sink
      scale:
        min: 1
      sink:
        log: {}
    - name: number-sink
      scale:
        min: 1
      sink:
        log: {}

  edges:
    - from: in
      to: even-or-odd
    - from: even-or-odd
      to: even-sink
      conditions:
        tags:
          values:
            - even-tag
    - from: even-or-odd
      to: odd-sink
      conditions:
        tags:
          operator: or
          values:
            - odd-tag
    - from: even-or-odd
      to: number-sink
      conditions:
        tags:
          values:
            - grouped

Resulting pipeline looks like this as it runs image

magelisk commented 3 months ago

Closing in favor of https://github.com/numaproj/numaflow-python/pull/178