Open Alxe1 opened 1 year ago
Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?
request device '/job:localhost/replica:0/task:0/device:CPU:0' but All available devices [ /job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].
Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?
def train_test():
from bigdl.orca.learn.tf2 import Estimator
from bigdl.orca import init_orca_context
from bigdl.orca import OrcaContext
sc = init_orca_context(cluster_mode='local', cores=8, memory="10g", num_nodes=3)
conf = SparkConf().setAppName("test")
conf.set("spark.sql.execution.arrow.enabled", True)
conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
MODEL_PATH = "/mytest/model"
data_transform = DataTransform(MODEL_PATH, spark)
uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()
config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}
est = Estimator.from_keras(model_creator=model_creator,
config=config,
backend="spark",
model_dir="hdfs://ip:port/ckpt")
train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)
stats = est.fit(train_data,
epochs=2,
batch_size=512,
feature_cols=["embed"],
label_cols=["label"],
steps_per_epoch=data_count // 512)
print("stats: {}".format(stats))
est.save("/mytest/model.h5", save_format="h5")
res = est.predict(data=sdf, feature_cols=["embed"])
res.show()
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.
From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
return SparkRunner(**init_param).predict(**param)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
local_model = self.model_creator(self.config)
File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
cross_num=4)
File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from model_creator
function (before starting predicting). Maybe you could provide more code including DeepCross
model, and we could help to look into how this error was caused.
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.
From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func return SparkRunner(**init_param).predict(**param) File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict local_model = self.model_creator(self.config) File "/mnt1/liulei/mytest/test.py", line 233, in model_creator cross_num=4) File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2 user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from
model_creator
function (before starting predicting). Maybe you could provide more code includingDeepCross
model, and we could help to look into how this error was caused.
This is the complete code:
import json
import os
import numpy as np
import tensorflow as tf
from pyspark import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from tensorflow.keras import Model
from tensorflow.keras.layers import Embedding, Dense
import datetime
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tensorflow.python.keras.layers import Concatenate
from tensorflow.python.keras.regularizers import l2
from base_class.spark_base import SparkBase
from pyspark import SparkContext, SparkConf
def deepcross(user_num, item_num, user_item_dim, sparse_num,
embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num):
inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input")
user_embed = Embedding(input_dim=user_num,
output_dim=user_item_dim,
embeddings_initializer="random_normal",
embeddings_regularizer=l2(embed_norm),
input_length=1)
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
item_embed = Embedding(input_dim=item_num,
output_dim=user_item_dim,
embeddings_initializer="random_normal",
embeddings_regularizer=l2(embed_norm),
input_length=1)
item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64))
sparse_feature = inputs[:, 2:]
input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature])
dnn_output_1 = Dense(units=64, activation="relu")(input_features)
dnn_output = Dense(units=64, activation="relu")(dnn_output_1)
# cross_output = CrossLayer(cross_num=cross_num)(input_features)
# output_1 = Concatenate(axis=-1)([dnn_output, cross_output])
output_2 = Dense(1)(dnn_output)
output = tf.nn.sigmoid(output_2)
model = Model(inputs, output, name="my_model")
return model
class DataTransform(object):
def __init__(self, model_path, spark):
self.MODEL_PATH = model_path
self.spark = spark
def read_data(self):
""""""
data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d")
# spark_base = SparkBase()
sdf = self.spark.sql(
"select uid,vr_id,gender, age,is_click from db.my_table "
"where data_date >= {}".format(data_date))
sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42)
return sampled_sdf
def transform_data(self, sdf):
sdf = sdf.fillna(0)
@pandas_udf(IntegerType())
def bucket_age(v):
def _bucket_age(row):
if row < 13:
return 0
elif 13 <= row < 18:
return 1
elif 18 <= row < 24:
return 2
elif 24 <= row < 30:
return 3
elif 30 <= row < 40:
return 4
elif 40 <= row < 50:
return 5
else:
return 6
return v.map(_bucket_age)
sdf = sdf.withColumn("age_index", bucket_age("age"))
string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep")
string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep")
string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep')
one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"],
outputCols=["o_gender_index", "o_age_index"])
pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder])
transform_model = pipeline.fit(sdf)
transformed_df = transform_model.transform(sdf)
vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index", "o_age_index"],
outputCol="embedded_vector")
va_df = vector_assembler.transform(transformed_df)
df = va_df.select("embedded_vector", "is_click").rdd. \
map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \
toDF()
uid_num = int(va_df.select("uid_index").rdd.max()[0])
vid_num = int(va_df.select("vid_index").rdd.max()[0])
sparse_num = len(df.select("embed").rdd.take(1)[0].embed)
data_count = df.count()
return uid_num, vid_num, sparse_num, data_count, df
def process(self):
data = self.read_data()
uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data)
print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}")
df.show()
return uid_num, vid_num, sparse_num, data_count, df
def model_creator(config):
deep_cross = deepcross(user_num=config["uid_num"]+1,
item_num=config["vid_num"]+1,
user_item_dim=16,
sparse_num=config["sparse_num"],
embed_norm=0.001,
dnn_hidden_units=[int(e) for e in [128, 64, 32]],
dnn_activation="relu",
dnn_dropout=0.2,
cross_num=4)
loss = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam()
deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()])
return deep_cross
def train_test():
from bigdl.orca.learn.tf2 import Estimator
from bigdl.orca import init_orca_context
from bigdl.orca import OrcaContext
sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3)
conf = SparkConf().setAppName("test")
conf.set("spark.sql.execution.arrow.enabled", True)
conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
MODEL_PATH = "/mytest/model"
data_transform = DataTransform(MODEL_PATH, spark)
uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()
config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}
est = Estimator.from_keras(model_creator=model_creator,
config=config,
backend="spark",
model_dir="hdfs://ip:port/ckpt")
train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)
stats = est.fit(train_data,
epochs=2,
batch_size=512,
feature_cols=["embed"],
label_cols=["label"],
steps_per_epoch=data_count // 512)
print("stats: {}".format(stats))
est.save("/mytest/model.h5", save_format="h5")
# stats = est.evaluate(sdf,
# feature_cols=["embed"],
# label_cols=["label"],
# num_steps=512)
# print("stats: {}".format(stats))
print(f"=========================data_count: {data_count}")
res = est.predict(data=sdf, feature_cols=["embed"])
res.show()
# est.shutdown()
if __name__ == '__main__':
train_test()
It trained the data sucessfully, But In prediction, it raised the error.
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL. From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func return SparkRunner(**init_param).predict(**param) File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict local_model = self.model_creator(self.config) File "/mnt1/liulei/mytest/test.py", line 233, in model_creator cross_num=4) File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2 user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from
model_creator
function (before starting predicting). Maybe you could provide more code includingDeepCross
model, and we could help to look into how this error was caused.This is the complete code:
import json import os import numpy as np import tensorflow as tf from pyspark import Row from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf from pyspark.sql.types import IntegerType from tensorflow.keras import Model from tensorflow.keras.layers import Embedding, Dense import datetime from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer from sklearn.metrics import accuracy_score, precision_score, recall_score from tensorflow.python.keras.layers import Concatenate from tensorflow.python.keras.regularizers import l2 from base_class.spark_base import SparkBase from pyspark import SparkContext, SparkConf def deepcross(user_num, item_num, user_item_dim, sparse_num, embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num): inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input") user_embed = Embedding(input_dim=user_num, output_dim=user_item_dim, embeddings_initializer="random_normal", embeddings_regularizer=l2(embed_norm), input_length=1) user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64)) item_embed = Embedding(input_dim=item_num, output_dim=user_item_dim, embeddings_initializer="random_normal", embeddings_regularizer=l2(embed_norm), input_length=1) item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64)) sparse_feature = inputs[:, 2:] input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature]) dnn_output_1 = Dense(units=64, activation="relu")(input_features) dnn_output = Dense(units=64, activation="relu")(dnn_output_1) # cross_output = CrossLayer(cross_num=cross_num)(input_features) # output_1 = Concatenate(axis=-1)([dnn_output, cross_output]) output_2 = Dense(1)(dnn_output) output = tf.nn.sigmoid(output_2) model = Model(inputs, output, name="my_model") return model class DataTransform(object): def __init__(self, model_path, spark): self.MODEL_PATH = model_path self.spark = spark def read_data(self): """""" data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d") # spark_base = SparkBase() sdf = self.spark.sql( "select uid,vr_id,gender, age,is_click from db.my_table " "where data_date >= {}".format(data_date)) sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42) return sampled_sdf def transform_data(self, sdf): sdf = sdf.fillna(0) @pandas_udf(IntegerType()) def bucket_age(v): def _bucket_age(row): if row < 13: return 0 elif 13 <= row < 18: return 1 elif 18 <= row < 24: return 2 elif 24 <= row < 30: return 3 elif 30 <= row < 40: return 4 elif 40 <= row < 50: return 5 else: return 6 return v.map(_bucket_age) sdf = sdf.withColumn("age_index", bucket_age("age")) string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep") string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep") string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep') one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"], outputCols=["o_gender_index", "o_age_index"]) pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder]) transform_model = pipeline.fit(sdf) transformed_df = transform_model.transform(sdf) vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index", "o_age_index"], outputCol="embedded_vector") va_df = vector_assembler.transform(transformed_df) df = va_df.select("embedded_vector", "is_click").rdd. \ map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \ toDF() uid_num = int(va_df.select("uid_index").rdd.max()[0]) vid_num = int(va_df.select("vid_index").rdd.max()[0]) sparse_num = len(df.select("embed").rdd.take(1)[0].embed) data_count = df.count() return uid_num, vid_num, sparse_num, data_count, df def process(self): data = self.read_data() uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data) print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}") df.show() return uid_num, vid_num, sparse_num, data_count, df def model_creator(config): deep_cross = deepcross(user_num=config["uid_num"]+1, item_num=config["vid_num"]+1, user_item_dim=16, sparse_num=config["sparse_num"], embed_norm=0.001, dnn_hidden_units=[int(e) for e in [128, 64, 32]], dnn_activation="relu", dnn_dropout=0.2, cross_num=4) loss = tf.keras.losses.BinaryCrossentropy() optimizer = tf.keras.optimizers.Adam() deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()]) return deep_cross def train_test(): from bigdl.orca.learn.tf2 import Estimator from bigdl.orca import init_orca_context from bigdl.orca import OrcaContext sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3) conf = SparkConf().setAppName("test") conf.set("spark.sql.execution.arrow.enabled", True) conf.set("spark.sql.execution.arrow.fallback.enabled", True) spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() MODEL_PATH = "/mytest/model" data_transform = DataTransform(MODEL_PATH, spark) uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process() config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)} est = Estimator.from_keras(model_creator=model_creator, config=config, backend="spark", model_dir="hdfs://ip:port/ckpt") train_data, test_data = sdf.randomSplit([0.8, 0.2], 100) stats = est.fit(train_data, epochs=2, batch_size=512, feature_cols=["embed"], label_cols=["label"], steps_per_epoch=data_count // 512) print("stats: {}".format(stats)) est.save("/mytest/model.h5", save_format="h5") # stats = est.evaluate(sdf, # feature_cols=["embed"], # label_cols=["label"], # num_steps=512) # print("stats: {}".format(stats)) print(f"=========================data_count: {data_count}") res = est.predict(data=sdf, feature_cols=["embed"]) res.show() # est.shutdown() if __name__ == '__main__': train_test()
It trained the data sucessfully, But In prediction, it raised the error.
Got it! We will try to reproduce it and check it out.
Hey, would you mind providing the dataset or where could I reach it?
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hi! I just fabricate the dataset and pass the est.predict
in your program on yarn-client mode. I got the predict result as below:
+--------------------+-----+--------------------+
| embed|label| prediction|
+--------------------+-----+--------------------+
|[0.0, 1.0, 1.0, 0...| 1|[0.5107923746109009]|
|[1.0, 2.0, 0.0, 1...| 0|[0.5187397003173828]|
|[2.0, 0.0, 1.0, 0...| 1|[0.5161656141281128]|
+--------------------+-----+--------------------+
I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hi! I just fabricate the dataset and pass the
est.predict
in your program on yarn-client mode. I got the predict result as below:+--------------------+-----+--------------------+ | embed|label| prediction| +--------------------+-----+--------------------+ |[0.0, 1.0, 1.0, 0...| 1|[0.5107923746109009]| |[1.0, 2.0, 0.0, 1...| 0|[0.5187397003173828]| |[2.0, 0.0, 1.0, 0...| 1|[0.5161656141281128]| +--------------------+-----+--------------------+
I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.
OK, I'll check it.
my code:
The error message: