Open jiapeijia opened 3 years ago
@jiapeijia thanks for raising the issue, could i please ask you what version of mleap you are using? also, do you have a small example that we could use to reproduce the issue? thanks a lot!
the version of mleap is 0.16.1, you can use the code below for testing.
import mleap.pyspark from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer from pyspark.ml import Pipeline, PipelineModel from pyspark.sql import Row from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark import SparkContext
conf = SparkConf().setAppName('ctrModel').setMaster('local') sc=SparkContext.getOrCreate(conf) spark = SparkSession.builder.config(conf=conf).getOrCreate()
l = [('Alice', 1), ('Bob', 2)] rdd = sc.parallelize(l) Person = Row('name', 'age') person = rdd.map(lambda r: Person(*r)) df2 = spark.createDataFrame(person) df2.collect()
string_indexer = StringIndexer(inputCol='name', outputCol='name_string_index') feature_assembler = VectorAssembler(inputCols=[string_indexer.getOutputCol()], outputCol="features") feature_pipeline = [string_indexer, feature_assembler] featurePipeline = Pipeline(stages=feature_pipeline) fittedPipeline = featurePipeline.fit(df2) fittedPipeline.serializeToBundle("jar:file:pyspark.example.zip", fittedPipeline.transform(df2))
this error normally happens when you don't have the necessary jars given to your spark session. What does your spark conf look like? Are you adding the required mleap jars?
I have same error. This is my code. @talalryz
import sys
sys.path.append("/demo/mleap/python")
from pyspark.sql import SparkSession
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark import SparkConf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml import Pipeline, PipelineModel
if __name__ == '__main__':
conf = SparkConf().setAppName('collaborativeFiltering').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
file_path = 'file:///Users/xxx/work/demo/SparrowRecSys/src/main/resources'
ratingResourcesPath = file_path + '/webroot/sampledata/ratings.csv'
ratingSamples = spark.read.format('csv').option('header', 'true').load(ratingResourcesPath) \
.withColumn("userIdInt", F.col("userId").cast(IntegerType())) \
.withColumn("movieIdInt", F.col("movieId").cast(IntegerType())) \
.withColumn("ratingFloat", F.col("rating").cast(FloatType()))
training, test = ratingSamples.randomSplit((0.8, 0.2))
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(regParam=0.01, maxIter=5, userCol='userIdInt', itemCol='movieIdInt', ratingCol='ratingFloat',
coldStartStrategy='drop')
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
model.itemFactors.show(10, truncate=False)
model.userFactors.show(10, truncate=False)
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol='ratingFloat', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = {}".format(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratingSamples.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratingSamples.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
pipeline = Pipeline(stages=[als])
pipelineModel = pipeline.fit(training)
pipelineModel.serializeToBundle("jar:file:/Users/xxx/work/demo/SparrowRecSys/als.zip", pipelineModel.transform(training))
# model.write().save("file:///Users/xxx/work/demo/SparrowRecSys/als")
userRecs.show(5, False)
movieRecs.show(5, False)
userSubsetRecs.show(5, False)
movieSubSetRecs.show(5, False)
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.01]).build()
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
cvModel = cv.fit(test)
avgMetrics = cvModel.avgMetrics
spark.stop()
@aresa7796 on the conf = SparkConf().setAppName('collaborativeFiltering').setMaster('local')
line you need to add a .set("spark.jars.packages", "ml.combust.mleap.mleap-spark_2.12:0.20.0")
so that spark will install the mleap jar into it's jvm. From your code I /think/ that is the only package you'll need but if it gives you errors you might also need some other ones too.
Small fix, this should be .set("spark.jars.packages", "ml.combust.mleap:mleap-spark_2.12:0.20.0")
with a ":" in there btw
How would you troubleshoot this error in an AWS Glue job? Here's my script
from pyspark.sql import SparkSession
from mleap.pyspark.spark_support import SimpleSparkSerializer
spark = (SparkSession
.builder
.appName("PySparkApp")
.config("spark.jars.packages", "ml.combust.mleap:mleap-spark_2.12:0.22.0")
.getOrCreate())
print(f'Spark version: {spark.version}')
serializer = SimpleSparkSerializer()
Output:
Spark version: 3.3.0-amzn-1
TypeError: 'JavaPackage' object is not callable
I'm trying to serialize a PySpark model using mleap in an AWS Glue job, but none of the documentation I've found has worked. For example, this notebook [1] relies on AWS provided python.zip and mleap_spark_assembly.jar files which are no longer publicly available. [1] https://github.com/aws-samples/aws-ml-readmission-prediction/blob/c2120c93cc96f3d3e0c71f7e279d75dadaeca19a/notebooks/readmission-risk-inference-pipeline-evaluation.ipynb
In my experience, the TypeError: 'JavaPackage' object is not callable
error has always indicated that spark's jvm didn't initialize correctly or doesn't have the correct packages installed.
If you add a .setMaster["local"]
into your builder, does that work? This would force Spark to run in the local machine (instead of a distributed cluster) and eliminate the cluster management as a source of issues.
File "E:\Anaconda\lib\site-packages\mleap\pyspark\spark_support.py", line 24, in serializeToBundle serializer = SimpleSparkSerializer() File "E:\Anaconda\lib\site-packages\mleap\pyspark\spark_support.py", line 39, in init self._java_obj = _jvm().ml.combust.mleap.spark.SimpleSparkSerializer() TypeError: 'JavaPackage' object is not callable