SeldonIO / tempo

MLOps Python Library
https://tempo.readthedocs.io/en/latest/
Apache License 2.0
114 stars 21 forks source link

Introduce functionality for kafka functionality for tempo #119

Open axsaucedo opened 3 years ago

axsaucedo commented 3 years ago

Currently tempo has clients that talk to remote models. In order to introduce support for stream processing such as through Kafka, we would need to extend the current interfaces to support asynchronous processing.

Models would be defined the same way

sklearn_model = Model(
    name="test-iris-sklearn",
    platform=ModelFramework.SKLearn,
    local_folder=f"{artifacts_folder}/{SKLearnFolder}",
    uri="s3://tempo/basic/sklearn",
)

xgboost_model = Model(
    name="test-iris-xgboost",
    platform=ModelFramework.XGBoost,
    local_folder=f"{artifacts_folder}/{XGBoostFolder}",
    uri="s3://tempo/basic/xgboost",
)

Option 1: AsyncIO

Each instance to a pipeline is processed synchronously, with each subsequent model "synchronously", of course using AsyncIO to release control, but waiting until the respective model "returns" the output to the output_topic.

Advantages

Disadvantages

@pipeline(
    name="classifier",
    uri="s3://tempo/basic/pipeline",
    local_folder=f"{artifacts_folder}/{PipelineFolder}",
    models=PipelineModels(sklearn=sklearn_model, xgboost=xgboost_model),
)
def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]:
    res1 = await classifier.models.sklearn(input=payload)

    if res1[0] == 1:
        return res1, SKLearnTag
    else:
        return await classifier.models.xgboost(input=payload), XGBoostTag

Option 2: Fully Asynchronous Streams

The second option is to approach it in the fully traditional independent stream processing approach, where each independent data instance would be processes as single step across the streaming pipeline process.

Advantages

Disadvantage

p = Pipeline(
    name="classifier",
    uri="s3://tempo/basic/pipeline",
    local_folder=f"{artifacts_folder}/{PipelineFolder}",
    models=PipelineModels(sklearn=sklearn_model, xgboost=xgboost_model),
)

@p.pipeline
def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]:
    classifier.models.sklearn.send(input=payload)

@p.models.sklearn
def sklearn(res1):
    if res1[0] == 1:
        return res1, SKLearnTag
    else:
        p.models.xgboost.send(input=payload), XGBoostTag

@p.models.xgboost
def xgboost(res):
    return res
adriangonz commented 3 years ago

I think both options can probably be combined.

From my point of view, asyncio support is a must. Even without streaming, it's probably needed to optimise pipelines (where I imagine most of the time would be spent in IO, waiting for the model's answer). For the streaming scenario, asyncio can be leveraged to optimise the waiting time for Kafka to come back.

We could think about making it optional though. That is, let people define both async or "classic" methods, and let Tempo decide how to call a custom function (i.e. with await or synchronously). This is similar to how FastAPI handles this, without forcing the user to write full async code.