aviolante / pyspark_dl_pipeline

35 stars 33 forks source link

PicklingError: Could not serialize object: ValueError: substring not found #2

Open hanzigs opened 4 years ago

hanzigs commented 4 years ago

While fitting traind_data, getting PicklingError, can you help on this, Thanks

fit_dl_pipeline = dl_pipeline.fit(train_data)
/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
    785         for element in obj:
--> 786             save(element)
    787 

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/databricks/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
    405             if klass is None or klass is not obj:
--> 406                 self.save_function_tuple(obj)
    407                 return

/databricks/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
    548             state['qualname'] = func.__qualname__
--> 549         save(state)
    550         write(pickle.TUPLE)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    842             elif n:
--> 843                 save(tmp[0])
    844                 write(APPEND)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/databricks/spark/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
    656             if PY3:
--> 657                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    658             else:

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    842             elif n:
--> 843                 save(tmp[0])
    844                 write(APPEND)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py in __reduce__(self)
    831         trainable=self.trainable,
--> 832         name=self._shared_name,
    833         dtype=self.dtype,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/variables.py in _shared_name(self)
   1149     """
-> 1150     return self.name[:self.name.index(":")]
   1151 

ValueError: substring not found

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
<command-2475319972266768> in <module>
----> 1 fit_dl_pipeline = dl_pipeline.fit(train_data)
      2 pred_train = fit_dl_pipeline.transform(train_data)
      3 pred_test = fit_dl_pipeline.transform(test_data)

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/python/lib/python3.7/site-packages/elephas/ml_model.py in _fit(self, df)
     90                         batch_size=self.get_batch_size(),
     91                         verbose=self.get_verbosity(),
---> 92                         validation_split=self.get_validation_split())
     93 
     94         model_weights = spark_model.master_network.get_weights()

/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
    149 
    150         if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 151             self._fit(rdd, epochs, batch_size, verbose, validation_split)
    152         else:
    153             raise ValueError(

/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
    186             worker = SparkWorker(yaml, parameters, train_config,
    187                                  optimizer, loss, metrics, custom)
--> 188             gradients = rdd.mapPartitions(worker.train).collect()
    189             new_parameters = self._master_network.get_weights()
    190             for grad in gradients:  # simply accumulate gradients one by one

/databricks/spark/python/pyspark/rdd.py in collect(self)
    829         # Default path used in OSS Spark / for non-credential passthrough clusters:
    830         with SCCallSiteSync(self.context) as css:
--> 831             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    832         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    833 

/databricks/spark/python/pyspark/rdd.py in _jrdd(self)
   2573 
   2574         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2575                                       self._jrdd_deserializer, profiler)
   2576         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2577                                              self.preservesPartitioning, self.is_barrier)

/databricks/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2475     assert serializer, "serializer should not be empty"
   2476     command = (func, profiler, deserializer, serializer)
-> 2477     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2478     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2479                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2461     # the serialized command will be compressed by broadcast
   2462     ser = CloudPickleSerializer()
-> 2463     pickled_command = ser.dumps(command)
   2464     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   2465         # The broadcast will have same life cycle as created PythonRDD

/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
    713                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    714             cloudpickle.print_exec(sys.stderr)
--> 715             raise pickle.PicklingError(msg)
    716 
    717 

PicklingError: Could not serialize object: ValueError: substring not found
mikechen66 commented 4 years ago

It is the TensorFlow/Keras conflict. It can be solved with the the downgrading method as follows.

!pip install q keras==2.2.4 !pip install q tensorflow==1.14.0

However, it would be much better if Andre had free time to upgrade the lines of code. The reason is that the major shuffle in the industry is to adopt to TensorFlow2.0/Keras 2.3(or tf.keras).

mikechen66 commented 4 years ago

The major issue is whether it gives explicit or implicit valuation in the context of TensorFlow 2.x/Keras 2.3. If the valuation is explicit(direct) and does not involve the other complex callbacks, it will be easy to solve the issue. However, the author has adopted the implicit(or indirect) recursive evaluation, including def dl_pipeline_fit_score_results(...) and evaluation with the statement dl_pipeline_fit_score_results(...) . So the Python Interpreter confuses the evaluation of the defined statement as follows.

dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline, 
                              train_data=train_data, 
                              test_data=test_data, 
                              label='label_index');

It has the same result even though I use the print statement with flush=True

print(dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,train_data=train_data, \
        test_data=test_data, label='label_index'), flush=True)

It is also generate same issue while I remove the eight statements from In [23] as follows.

    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)

    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label, "prediction")

    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))

    metrics_train = MulticlassMetrics(pred_and_label_train)
    metrics_test = MulticlassMetrics(pred_and_label_test)

While I try to move the eight statements from the defined dl_pipeline_fit_score_results(...) to other place, it generates the error such as "NameError: name 'pred_train' is not defined" or "NameError: name 'pnl_train' is not defined" . That means variables are local variables. It complies with the LEGB rules.

Thererefore, it is quite hard to correct the issue of "PicklingError: Could not serialize object: ValueError: substring not found" in the context of TensorFlow 2.x/Keras 2.3.

It would be good if the author spared his time to change it in adapting to the context of TensorFlow 2.x/Keras 2.3.