nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
310 stars 119 forks source link

[FEA]: Support passing a custom parser to HttpServerSourceStage #1703

Closed dagardner-nv closed 1 month ago

dagardner-nv commented 1 month ago

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

Medium

Please provide a clear description of problem this feature solves

Not all JSON payloads translate naturally into DataFrames

Describe your ideal solution

Pass in an optional function.

Currently we do:

    def _parse_payload(self, payload: str) -> HttpParseResponse:
        try:
            df = cudf.read_json(payload, lines=self._lines, engine='cudf')
        except Exception as e:
        ...

Instead we could do:

    def _parse_payload(self, payload: str) -> HttpParseResponse:
        try:
            if self._custom_parse_fn is not None:
                df = self._custom_parse_fn(payload)
            else:
                df = cudf.read_json(payload, lines=self._lines, engine='cudf')
        except Exception as e:
        ...

Additional context

No response

Code of Conduct