databricks-demos / dbdemos

Demos to implement your Databricks Lakehouse
Other
255 stars 80 forks source link

C360 demo fails to run the DLT pipeline (Azure FieldEng WS) #121

Open guidooswaldDB opened 2 months ago

guidooswaldDB commented 2 months ago

getting this error (apparently fails to deploy the UDF... but no idea why)

java.lang.RuntimeException: Failed to execute python command for notebook '/Users/guido.oswald@databricks.com/lakehouse-retail-c360/01-Data-ingestion/01.2-DLT-churn-Python-UDF' with id RunnableCommandId(4985314217257097561) and error AnsiResult(--------------------------------------------------------------------------- TypeError Traceback (most recent call last) File :6 2 mlflow.set_registry_uri('databricks-uc') 3 # Stage/version
4 # Model name |
5 # | |
----> 6 predict_churn_udf = mlflow.pyfunc.spark_udf(spark, "models:/guido.c360.dbdemos_customer_churn@prod", "int") 7 spark.udf.register("predict_churn", predict_churn_udf)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/pyfunc/init.py:1026, in spark_udf(spark, model_uri, result_type, env_manager) 1016 if not sys.platform.startswith("linux"): 1017 # TODO: support killing mlflow server launched in UDF task when spark job canceled 1018 # for non-linux system. 1019 # https://stackoverflow.com/questions/53208/how-do-i-automatically-destroy-child-processes-in-windows 1020 _logger.warning( 1021 "In order to run inference code in restored python environment, PySpark UDF " 1022 "processes spawn MLflow Model servers as child processes. Due to system " 1023 "limitations with handling SIGKILL signals, these MLflow Model server child " 1024 "processes cannot be cleaned up if the Spark Job is canceled." 1025 ) -> 1026 pyfunc_backend = get_flavor_backend( 1027 local_model_path, 1028 env_manager=env_manager, 1029 install_mlflow=os.environ.get("MLFLOW_HOME") is not None, 1030 create_env_root_dir=True, 1031 ) 1032 if not should_use_spark_to_broadcast_file: 1033 # Prepare restored environment in driver side if possible. 1034 # Note: In databricks runtime, because databricks notebook cell output cannot capture (...) 1040 # to wait conda command fail and suddenly get all output printed (included in error 1041 # message). 1042 if env_manager != _EnvManager.LOCAL:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/models/flavor_backend_registry.py:48, in get_flavor_backend(model_uri, **kwargs) 44 underlying_model_uri = model_uri 45 local_path = _download_artifact_from_uri( 46 append_to_uri_path(underlying_model_uri, MLMODEL_FILE_NAME), output_path=tmp.path() 47 ) ---> 48 model = Model.load(local_path) 49 else: 50 model = None

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/models/model.py:483, in Model.load(cls, path) 481 path = os.path.join(path, MLMODEL_FILE_NAME) 482 with open(path) as f: --> 483 return cls.from_dict(yaml.safe_load(f.read()))

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/models/model.py:493, in Model.from_dict(cls, model_dict) 491 model_dict = model_dict.copy() 492 if "signature" in model_dict and isinstance(model_dict["signature"], dict): --> 493 model_dict["signature"] = ModelSignature.from_dict(model_dict["signature"]) 495 if "model_uuid" not in model_dict: 496 model_dict["model_uuid"] = None

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/models/signature.py:90, in ModelSignature.from_dict(cls, signature_dict) 79 @classmethod 80 def from_dict(cls, signature_dict: Dict[str, Any]): 81 """ 82 Deserialize from dictionary representation. 83 (...) 88 :return: ModelSignature populated with the data form the dictionary. 89 """ ---> 90 inputs = Schema.from_json(signature_dict["inputs"]) 91 if "outputs" in signature_dict and signature_dict["outputs"] is not None: 92 outputs = Schema.from_json(signature_dict["outputs"])

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/types/schema.py:397, in Schema.from_json(cls, json_str) 394 def read_input(x: dict): 395 return TensorSpec.from_json_dict(x) if x["type"] == "tensor" else ColSpec(x) --> 397 return cls([read_input(x) for x in json.loads(json_str)])

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/types/schema.py:397, in (.0) 394 def read_input(x: dict): 395 return TensorSpec.from_json_dict(x) if x["type"] == "tensor" else ColSpec(x) --> 397 return cls([read_input(x) for x in json.loads(json_str)])

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-8dd45107-5894-4639-ad07-ca02d63a5488/lib/python3.9/site-packages/mlflow/types/schema.py:395, in Schema.from_json..read_input(x) 394 def read_input(x: dict): --> 395 return TensorSpec.from_json_dict(x) if x["type"] == "tensor" else ColSpec(x)

TypeError: init() got an unexpected keyword argument 'required',None,Map(),Map(),List(),List(),Map()) at com.databricks.pipelines.execution.core.languages.PythonRepl.$anonfun$runCmd$1(PythonRepl.scala:325) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:265) at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:263) at com.databricks.pipelines.execution.core.languages.PythonRepl.recordFrameProfile(PythonRepl.scala:48) at com.databricks.pipelines.execution.core.languages.PythonRepl.runCmd(PythonRepl.scala:293) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePythonPipelineGraphLoader.$anonfun$loadPythonGraph$12(WorkspacePythonPipelineGraphLoader.scala:159) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePythonPipelineGraphLoader.$anonfun$loadPythonGraph$10(WorkspacePythonPipelineGraphLoader.scala:143) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePythonPipelineGraphLoader.$anonfun$loadPythonGraph$10$adapted(WorkspacePythonPipelineGraphLoader.scala:105) at scala.collection.immutable.Map$Map1.foreach(Map.scala:193) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePythonPipelineGraphLoader.loadPythonGraph(WorkspacePythonPipelineGraphLoader.scala:105) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineGraphLoader.loadGraph(WorkspacePipelineGraphLoader.scala:161) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineGraphLoader.loadGraph(WorkspacePipelineGraphLoader.scala:54) at com.databricks.pipelines.execution.extensions.workspace.WorkspacePipelineExecutionExtension$.loadGraph(WorkspacePipelineExecutionExtension.scala:18) at com.databricks.pipelines.execution.service.DLTComputeRunnableContext.loadGraph(DLTComputeRunnableContext.scala:100) at com.databricks.pipelines.execution.core.UpdateExecution.initializationForUpdates(UpdateExecution.scala:555) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$initializeAndLoadGraphForRegularUpdate$1(UpdateExecution.scala:642) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$3(DeltaPipelinesUsageLogging.scala:115) at com.databricks.pipelines.common.monitoring.OperationStatusReporter.executeWithPeriodicReporting(OperationStatusReporter.scala:120) at com.databricks.pipelines.common.monitoring.OperationStatusReporter$.executeWithPeriodicReporting(OperationStatusReporter.scala:160) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$6(DeltaPipelinesUsageLogging.scala:135) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:560) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:657) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:678) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:652) at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:569) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperationWithResultTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:560) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:528) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation(DeltaPipelinesUsageLogging.scala:24) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation0(DeltaPipelinesUsageLogging.scala:59) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$1(DeltaPipelinesUsageLogging.scala:127) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation(DeltaPipelinesUsageLogging.scala:105) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation$(DeltaPipelinesUsageLogging.scala:101) at com.databricks.pipelines.execution.core.UpdateExecution.recordPipelinesOperation(UpdateExecution.scala:68) at com.databricks.pipelines.execution.core.UpdateExecution.executeStage(UpdateExecution.scala:412) at com.databricks.pipelines.execution.core.UpdateExecution.initializeAndLoadGraphForRegularUpdate(UpdateExecution.scala:642) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$executeUpdate$1(UpdateExecution.scala:524) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at com.databricks.pipelines.execution.core.UpdateExecution.executeUpdate(UpdateExecution.scala:523) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$3(UpdateExecution.scala:232) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:24) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:24) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging$$anon$1.runWithAttributionTags(DeltaPipelinesUsageLogging.scala:77) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.withDbAttributionTags(DeltaPipelinesUsageLogging.scala:84) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.withDbAttributionTags$(DeltaPipelinesUsageLogging.scala:83) at com.databricks.pipelines.execution.core.UpdateExecution.withDbAttributionTags(UpdateExecution.scala:68) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$1(UpdateExecution.scala:208) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.BaseUCContext.$anonfun$runWithNewUCS$1(BaseUCContext.scala:471) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:100) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:105) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:104) at com.databricks.pipelines.execution.core.BaseUCContext.runWithNewUCS(BaseUCContext.scala:465) at com.databricks.pipelines.execution.core.UCContextCompanion$OptionUCContextHelper.runWithNewUCSIfAvailable(BaseUCContext.scala:1079) at com.databricks.pipelines.execution.core.UpdateExecution.start(UpdateExecution.scala:195) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.$anonfun$run$2(ExecutionBackend.scala:710) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.CommandContextUtils$.withCommandContext(CommandContextUtils.scala:66) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.run(ExecutionBackend.scala:706) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:116) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:79) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:78) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:64) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:113) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:116) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

QuentinAmbard commented 2 months ago

Hey Guido, I released a new version yesterday to force DLT pipeline to use CURRENT and not PREVIEW (it seems to be a MLFlow issue with the preview channel). Try to switch it & rerun hopefully it'll work