databricks / spark-deep-learning

Deep Learning Pipelines for Apache Spark
https://databricks.github.io/spark-deep-learning
Apache License 2.0
1.99k stars 494 forks source link

Distributed Hyperparameter Tuning Demo not working #160

Open heng2j opened 5 years ago

heng2j commented 5 years ago

Hi Team,

I received the following error when try to the Distributed Hyperparameter Tuning

ValueError: Error when checking target: expected predictions to have shape (1000,) but got array with shape (2,)

Distributed Hyperparameter Tuning

I suspected we should try to remove the final classification layers when we are importing InceptionV3?

# load inceptionV3 model + remove final classification layers
model = InceptionV3(weights='imagenet' , include_top=False)

The following code is what I have used. I am trying to get the image url from S3 rather than from HDFS.

` from pyspark.ml.image import ImageSchema from pyspark.sql.functions import lit from sparkdl.image import imageIO from pyspark.sql.functions import col, asc import pyspark.ml.linalg as spla import pyspark.sql.types as sptyp import numpy as np

def CreateTrainImageUriandLabels(image_uris, label, label_name, cardinality, isDefault):
    # Create image categorical labels (integer IDs)
    local_rows = []
    for uri in image_uris:
        label_inds = np.zeros(cardinality)
        label_inds[label] = 1.0
        one_hot_vec = spla.Vectors.dense(label_inds.tolist())
        _row_struct = {"uri": uri, "one_hot_label": one_hot_vec, "label": float(label),
                       "label_name": str(label_name), "isDefault": int(isDefault)}
        row = sptyp.Row(**_row_struct)
        local_rows.append(row)

    image_uri_df = sqlContext.createDataFrame(local_rows)
    return image_uri_df

label_cardinality = 2

label_list = ['Table', 'Chair']

label_cardinality = len(label_list)
label_nums = list(range(label_cardinality))

train_df = CreateTrainImageUriandLabels(['dummy'], 1, 'empty', 2, 1)
test_df = CreateTrainImageUriandLabels(['dummy'], 0, 'empty', 2, 1)

get_images_urls(label_list)

train_df = train_df.filter(train_df.isDefault == 0)
test_df = test_df.filter(test_df.isDefault == 0)

train_df.show()
test_df.show()

# Under the hood, each of the partitions is fully loaded in memory, which may be expensive.
# This ensure that each of the paritions has a small size.
train_df = train_df.repartition(100)
test_df = test_df.repartition(100)

from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')  # saves to the local filesystem

import PIL.Image
import numpy as np
from keras.applications.imagenet_utils import preprocess_input

def load_image_from_uri(local_uri):

    response = requests.get(local_uri)
    # img = Image.open(BytesIO(response.content))
    # img = image.load_img(BytesIO(response.content), target_size=(299, 299))
    #
    img = (PIL.Image.open(BytesIO(response.content)).convert('RGB').resize((299, 299), PIL.Image.ANTIALIAS))
    img_arr = np.array(img).astype(np.float32)
    img_tnsr = preprocess_input(img_arr[np.newaxis, :])

    print("img_tnsr: ", img_tnsr)

    return img_tnsr

from sparkdl.estimators.keras_image_file_estimator import KerasImageFileEstimator

estimator = KerasImageFileEstimator(inputCol="uri",
                                    outputCol="prediction",
                                    labelCol="one_hot_label",
                                    imageLoader=load_image_from_uri,
                                    kerasOptimizer='adam',
                                    kerasLoss='categorical_crossentropy',
                                    modelFile='/tmp/model-full.h5')

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (
    ParamGridBuilder()
        .addGrid(estimator.kerasFitParams, [{"batch_size": 16, "verbose": 0},
                                            {"batch_size": 16, "verbose": 0}])
        .build()
)
mc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label")
cv = CrossValidator(estimator=estimator, estimatorParamMaps=paramGrid, evaluator=mc, numFolds=2)

cvModel = cv.fit(train_df)`