nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
342 stars 126 forks source link

[FEA]: Create REST Source/Sink Stages #864

Closed mdemoret-nv closed 1 year ago

mdemoret-nv commented 1 year ago

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

High

Please provide a clear description of problem this feature solves

To make a Morpheus pipeline that can receive messages from a client requires making a custom stage. While the implementation of a REST source and sink will vary significantly, we can provide a base implementation which is extendable for users needs.

Describe your ideal solution

This new feature should include:

Describe any alternatives you have considered

No response

Additional context

No response

Code of Conduct

dagardner-nv commented 1 year ago

For the source I think we will want two versions: 1) A source that as you described runs an HTTP server and listens for incoming HTTP requests. 2) A polling REST client that periodically polls a remote endpoint.

Then for the sink, it would be as you describe a simple REST client that is able to POST to a remote endpoint.

For all three, we should be able to support both a Python and C++ impl. On the Python side, we currently have: Flask 2.3, Gunicorn 20.1 and Requests 2.28 being pulled in as transitive dependencies, so we can simply add these to our Conda yaml without too much trouble.

Flask+Gunicorn should be sufficient for the listening server and Requests for the polling source and the sink. Gunicorn can be configured with a dict allowing end users an easy way to supply any config overrides.

On the C++ side, the version of Boost we're using includes the Beast library which can be used to build an HTTP client and server (https://www.boost.org/doc/libs/1_74_0/libs/beast/doc/html/index.html).

For the sink stage, I think DFP's DFPMLFlowModelWriterStage is a good starting point where we allow the user to pass in model_name_formatter and experiment_name_formatter strings and we document what values we will support as named format strings:

class WriteToRestStage(SinglePortStage):
    """
    Write all messages to a Kafka cluster.

    Parameters
    ----------
    c : `morpheus.config.Config`
        Pipeline configuration instance.
    base_url : str
        Server base url, should include the intended protocol (e.g. http:// or https://) and port if necessary.
        This may or may not include a base path from which `endpoint` will be appended.
        examples:
            "https://nvcr.io/"
            "http://localhost:8080/base/path"
    endpoint : str
        Endpoint to which messages will be sent. This will be appended to `base_url` and may include a query string and
        named format strings. Named format strings will be replaced with the corresponding column value from the first
        row of the incoming dataframe, if no such column exists a `ValueError` will be raised.
        examples:
            "api/v1/endpoint"
            "api/v1/endpoint?time={timestamp}&id={id}"
            "/{model_name}/{user}?time={timestamp}"
    method : str, optional
        HTTP method to use when sending messages, by default "POST". Currently only "POST", "PUT" and "PATCH" are
        supported.
    headers : dict, optional
        Optional set of headers to include in the request.
    acceptable_status_codes : typing.Tuple[int], optional
        Tuple of acceptable status codes, by default (200, 201, 202).

    <Probably need some timeout and retry type args here>
    """

    def __init__(self,
                 c: Config,
                 base_url: str,
                 endpoint: str,
                 method: str = "POST",
                 headers: dict = None,
                 acceptable_status_codes: typing.Tuple[int] = (200, 201, 202)):
...

    def msg_to_url(self, x: MessageMeta) -> str:
        endpoint = self._endpoint.format(**x.df.iloc[0].to_dict())
        return f"{self._base_url}/{endpoint}"

    def msg_to_payload(self, x: MessageMeta) -> typing.List[str]:
        return serializers.df_to_json(x.df, strip_newlines=True)

This would allow for end-users to subclass the stage by overrriding the msg_to_url and msg_to_payload methods.

On the C++ side we could use something like the {fmt} lib (https://fmt.dev/latest/index.html) which is installable via Conda and supports Python style named format strings (unfortunately not something that is in the C++20 format lib).

As an optimization for users who have a static endpoint, we could disable casting the first row of the DF to dict. Something like:

    def msg_to_url(self, x: MessageMeta) -> str:
        if self._static_endpoint:
            endpoint = self._endpoint.format({'timestamp': int(time.mktime(time.localtime()))}) # some minimal set of args that are cheap to produce
        else:
            endpoint = self._endpoint.format(**x.df.iloc[0].to_dict())
        return f"{self._base_url}/{endpoint}"

Alternately we could allow users to pass in their own functions for these two tasks, but we would need to document that doing so either disabled C++ execution (or needs the GIL). This way we limit what we expose in Python to functionality we can easily reproduce on the C++ side. Presumably if someone were to subclass the Python stage to override these two methods they could also subclass the C++ stage as well.

For larger dataframes we could support three possible options: 1) Chunk ourselves by a fixed max_row_count 2) Use a streamed upload 3) Use a chunked-encoded request

The polling REST client will mirror this stage, with "GET" being the default HTTP method, and a poll interval.

dagardner-nv commented 1 year ago

For the HTTP source stage, the webserver (likely Gunicorn) will want to control its own worker threads, and we want users to be able to configure this as well.

I'm thinking we will want to source stage to be decoupled from the webserver, with a queue (FiberQueue) between the two.

gurpreets-nvidia commented 1 year ago

Do we have an update on this?

dagardner-nv commented 1 year ago

@gurpreets-nvidia there is an open PR (#977) which contains this code, it unfortunately was placed on the back-burner for a while, but hopefully should be merged soon.

dagardner-nv commented 1 year ago

@gurpreets-nvidia this just got merged into branch-23.11 which adds the following four new stages: