Open fatenlouati opened 1 year ago
See this example https://github.com/intel-analytics/BigDL/blob/05553413a0a3a4bc48039077c8dd69db7ba590c0/python/orca/tutorial/NCF/tf_predict_xshards.py#L35 for the predictions.
This estimator can't predict streams
, it's not a MLlib estimator.
@qiuxin2012 Thank you for your response. Any idea to predict streams ?
@qiuxin2012 Thank you for your response. Any idea to predict streams ?
I think you may get the trained model (which is a standard Keras model), and directly use it in your PySpark code
@jason-dai, This is exactly what I did. This is the code:
def foreach_batch_function(dataset, epoch_id):
model = tf.keras.models.load_model('model.h5')
prediction=model.predict(dataset)
print("model loaded")
t1=time.time()
print("time",t1-t0)
predictionFinal = prediction.select("prediction","label")
predictionFinal.show(truncate=False, n=2)
q=dataset.writeStream.outputMode("append").foreachBatch(foreach_batch_function).queryName("ok").trigger(processingTime="60 seconds").start()
The model is trained using bigdl-orca estimator
and saved as .h5
. dataset is the test set read as streams using:
dataset = spark\
.readStream\
.option("header", "false").option("inferschema",True)\
.csv("test/")
but I got this error:
ERROR:py4j.java_gateway:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/opt/spark/python/pyspark/sql/utils.py", line 207, in call
raise e
File "/opt/spark/python/pyspark/sql/utils.py", line 204, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "/tmp/ipykernel_555534/452997054.py", line 11, in foreach_batch_function
model = tf.keras.models.load_model('/home/faten/these/cofedmarlak-encrypt.h5')
File "/home/faten/anaconda3/envs/pysparkenv/lib/python3.10/site-packages/keras/src/saving/saving_api.py", line 262, in load_model
return legacy_sm_saving_lib.load_model(
File "/home/faten/anaconda3/envs/pysparkenv/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
raise e.with_traceback(filtered_tb) from None
File "/home/faten/anaconda3/envs/pysparkenv/lib/python3.10/site-packages/h5py/_hl/files.py", line 567, in __init__
fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
File "/home/faten/anaconda3/envs/pysparkenv/lib/python3.10/site-packages/h5py/_hl/files.py", line 231, in make_fid
fid = h5f.open(name, flags, fapl=fapl)
File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
File "h5py/h5f.pyx", line 106, in h5py.h5f.open
OSError: Unable to open file (file signature not found)
hello, I trained a model using :
est = Estimator.from_keras(model, workers_per_node=2)
est.fit(x,y)
then I saved the model,now I want to load it and use it to
predict
. How to do it? and could I use it to predictstreams
as done withsparkMLilb
? Thank you