bentoml / BentoML

The easiest way to serve AI apps and models - Build Model Inference APIs, Job queues, LLM apps, Multi-model pipelines, and more!
https://bentoml.com
Apache License 2.0
7.14k stars 791 forks source link

Spark MLlib support #666

Closed parano closed 3 years ago

parano commented 4 years ago

Is your feature request related to a problem? Please describe. Add support for Spark MLlib models in BentoML

Describe the solution you'd like Add a new model artifact class PySparkModelArtifact, here is the example usage:

# pyspark_example_service.py
from bentoml import env, artifacts, api, BentoService
from bentoml.handlers import DataframeHandler
from bentoml.artifact import PySparkModelArtifact

@env(auto_pip_dependencies=True)
@artifacts([PySparkModelArtifact('pyspark_model', spark_version="2.4.0")])
class PySparkExampleService(BentoService):

    @api(DataframeHandler)
    def predict(self, df):
        model_input = df.to_numpy()
        return self.artifacts.pyspark_model.predict(model_input)
from pyspark_example_service import PySparkExampleService
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()
sc = spark.sparkContext

# training data
model = LogisticRegressionWithSGD.train(training_data, iterations=20)

svc = PySparkExampleService()
# the current SparkContext is required for packing the model
svc.pack('pyspark_model', model, spark_context=sc)
svc.save()

PySpark models can't be directly pickled so it does not work with PickleArtifact. PySparkModelArtifact uses SparkSession and pyspark_model's save and load under the hood. e.g.:

Save:

if isinstance(pyspark_model,
     pyspark.ml.pipeline.PipelineModel) or isinstance(
     pyspark_model, pyspark.ml.base.Model):
           pyspark_model.save(spark_model_save_loc)
else:
     pyspark_model.save(sc, spark_model_save_loc)

Load:

    spark = SparkSession.builder.appName('BentoService').getOrCreate()

    model_data_path = os.path.join(base_path, "pyspark_model_data")
    metadata_path = os.path.join(base_path, ''metadata.json")

    with open(metadata_path, "r") as metadata:
        metadata = json.load(metadata)
        if "model_class" not in metadata:
            raise BentoMLException("Malformed metadata file.")
        model_class = metadata["model_class"]

        logger.info("Loading %s model from %s" % (model_class, model_path))
        splits = model_class.split(".")
        module = ".".join(splits[:-1])
        class_name = splits[-1]
        ModelClass = getattr(importlib.import_module(module), class_name)
        if issubclass(ModelClass,
                 pyspark.ml.pipeline.PipelineModel) or issubclass(
                 ModelClass, pyspark.ml.base.Model):
            model = ModelClass.load(model_path)
        else:
            model = ModelClass.load(spark.sparkContext, model_path)

Sample code based on https://github.com/ucbrise/clipper/blob/develop/containers/python/pyspark_container.py#L27

Describe alternatives you've considered n/a

Additional context n/a

Sharathmk99 commented 4 years ago

@parano Do you want to consider https://github.com/combust/mleap for deploying spark models to production

parano commented 4 years ago

@Sharathmk99 mleap is probably not a good default here. It is possible to add a mleap option that lets BentoML use mleap for batch inferencing when deploying the model to a Spark cluster, although deploying BentoService to Spark cluster for batch inferencing is no yet supported

joshuacwnewton commented 4 years ago

Hi @parano, just commenting to indicate my interest in this as a larger project for the summer, once I am done with the integration test issues. :)

joshuacwnewton commented 4 years ago

I'm just doing some starting research to understand the scope of this feature a bit more. I'm a bit new to Spark/PySpark, so I'm learning as I go... I've really wanted to learn how to use it, though, so this is quite exciting. Apologies in advance for any newbie moments. :smile:

Anyways, here are some design points that have multiple options, and could be discussed further:

  1. Supported MLLib versions: e.g. 3.X/2.X/1.X
    • Spark's ML Migration Guide may be of help here for identifying breaking changes between versions.
  2. Supported API type: spark.mllib support (RDD-based) vs. spark.ml (Spark DataFrame-based)
  3. Supported model saving methods: model.save()/model.load() (as shown in issue description) vs. PMML export (seems to be spark.mllib-specific, not as well-supported)
  4. Adapter types: RDD, DataFrames
    • Adding e.g. a PySparkDataframeAdapter might be outside of scope... still, would complement a PySparkModelArtifact quite nicely :)
  5. Handling Spark JAR dependencies e.g. with testing/Travis:

A lot of these decisions seem like they can be simplified by prioritising the currently-recommended spark.ml (DataFrame) API to start. (This seems analogous to prioritising TF2 support, I think.) Then, later exploring backwards compatibility using tests (like what was done with TF1 tests.)

But, supporting RDDs/spark.mllib might be more crucial than I realize... thoughts?

1e0ng commented 4 years ago

Wow, this issue has a lucky number. 😄 Any progress on this?

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.