databricks-demos / dbdemos

Demos to implement your Databricks Lakehouse
Other
255 stars 80 forks source link

dbdemos_lakehouse_churn_init_<your_name> failed with Notebook Exception - "com.databricks.WorkflowException: com.databricks.NotebookExecutionException: FAILED" #46

Closed JavinDolia closed 11 months ago

JavinDolia commented 11 months ago

while running the lakehouse churn job, the job got failed with notebook exception - com.databricks.WorkflowException: com.databricks.NotebookExecutionException: FAILED

Screenshot 2023-07-10 111557

From DLT pipeline the error code is pasted below for reference.

Update 0507a1 is FAILED.

java.lang.RuntimeException: Failed to execute python command for notebook '/Users/javin@bizanalytica.com/databricks_demo/lakehouse-retail-c360/01-Data-ingestion/01.2-DLT-churn-Python-UDF' with id RunnableCommandId(4793992086333503899) and error AnsiResult(--------------------------------------------------------------------------- RestException Traceback (most recent call last) File :6 2 import mlflow 3 # Stage/version
4 # Model name |
5 # | |
----> 6 predict_churn_udf = mlflow.pyfunc.spark_udf(spark, "models:/dbdemos_customer_churn/Production") 7 spark.udf.register("predict_churn", predict_churn_udf)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/pyfunc/init.py:996, in spark_udf(spark, model_uri, result_type, env_manager) 989 if not any(isinstance(elem_type, x) for x in supported_types): 990 raise MlflowException( 991 message="Invalid result_type '{}'. Result type can only be one of or an array of one " 992 "of the following types: {}".format(str(elem_type), str(supported_types)), 993 error_code=INVALID_PARAMETER_VALUE, 994 ) --> 996 local_model_path = _download_artifact_from_uri( 997 artifact_uri=model_uri, 998 output_path=_create_model_downloading_tmp_dir(should_use_nfs), 999 ) 1001 if env_manager == _EnvManager.LOCAL: 1002 # Assume spark executor python environment is the same with spark driver side. 1003 _warn_dependency_requirement_mismatches(local_model_path)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/artifact_utils.py:100, in _download_artifact_from_uri(artifact_uri, output_path) 94 """ 95 :param artifact_uri: The absolute URI of the artifact to download. 96 :param output_path: The local filesystem path to which to download the artifact. If unspecified, 97 a local output path will be created. 98 """ 99 root_uri, artifact_path = _get_root_uri_and_artifact_path(artifact_uri) --> 100 return get_artifact_repository(artifact_uri=root_uri).download_artifacts( 101 artifact_path=artifact_path, dst_path=output_path 102 )

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/artifact_repository_registry.py:114, in get_artifact_repository(artifact_uri) 104 def get_artifact_repository(artifact_uri): 105 """Get an artifact repository from the registry based on the scheme of artifact_uri 106 107 :param artifact_uri: The artifact store URI. This URI is used to select which artifact (...) 112 requirements. 113 """ --> 114 return _artifact_repository_registry.get_artifact_repository(artifact_uri)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/artifact_repository_registry.py:72, in ArtifactRepositoryRegistry.get_artifact_repository(self, artifact_uri) 65 if repository is None: 66 raise MlflowException( 67 "Could not find a registered artifact repository for: {}. " 68 "Currently registered schemes are: {}".format( 69 artifact_uri, list(self._registry.keys()) 70 ) 71 ) ---> 72 return repository(artifact_uri)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/models_artifact_repo.py:42, in ModelsArtifactRepository.init(self, artifact_uri) 37 self.repo = UnityCatalogModelsArtifactRepository( 38 artifact_uri=artifact_uri, registry_uri=registry_uri 39 ) 40 elif is_using_databricks_registry(artifact_uri): 41 # Use the DatabricksModelsArtifactRepository if a databricks profile is being used. ---> 42 self.repo = DatabricksModelsArtifactRepository(artifact_uri) 43 else: 44 uri = ModelsArtifactRepository.get_underlying_uri(artifact_uri)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/databricks_models_artifact_repo.py:63, in DatabricksModelsArtifactRepository.init(self, artifact_uri) 59 self.databricks_profile_uri = ( 60 get_databricks_profile_uri_from_artifact_uri(artifact_uri) or mlflow.get_registry_uri() 61 ) 62 client = MlflowClient(registry_uri=self.databricks_profile_uri) ---> 63 self.model_name, self.model_version = get_model_name_and_version(client, artifact_uri) 64 # Use an isolated thread pool executor for chunk uploads/downloads to avoid a deadlock 65 # caused by waiting for a chunk-upload/download task within a file-upload/download task. 66 # See https://superfastpython.com/threadpoolexecutor-deadlock/#Deadlock_1_Submit_and_Wait_for_a_Task_Within_a_Task 67 # for more details 68 self.chunk_thread_pool = self._create_thread_pool()

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/utils/models.py:94, in get_model_name_and_version(client, models_uri) 92 if model_alias is not None: 93 return model_name, client.get_model_version_by_alias(model_name, model_alias).version ---> 94 return model_name, str(_get_latest_model_version(client, model_name, model_stage))

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/artifact/utils/models.py:32, in _get_latest_model_version(client, name, stage) 27 def _get_latest_model_version(client, name, stage): 28 """ 29 Returns the latest version of the stage if stage is not None. Otherwise return the latest of all 30 versions. 31 """ ---> 32 latest = client.get_latest_versions(name, None if stage is None else [stage]) 33 if len(latest) == 0: 34 stage_str = "" if stage is None else f" and stage '{stage}'"

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/client.py:2425, in MlflowClient.get_latest_versions(self, name, stages) 2353 def get_latest_versions(self, name: str, stages: List[str] = None) -> List[ModelVersion]: 2354 """ 2355 Latest version models for each requests stage. If no stages provided, returns the 2356 latest version for each stage. (...) 2423 current_stage: None 2424 """ -> 2425 return self._get_registry_client().get_latest_versions(name, stages)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/tracking/_model_registry/client.py:140, in ModelRegistryClient.get_latest_versions(self, name, stages) 130 def get_latest_versions(self, name, stages=None): 131 """ 132 Latest version models for each requests stage. If no stages provided, returns the 133 latest version for each stage. (...) 138 :return: List of :py:class:mlflow.entities.model_registry.ModelVersion objects. 139 """ --> 140 return self.store.get_latest_versions(name, stages)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/model_registry/rest_store.py:169, in RestStore.get_latest_versions(self, name, stages) 159 """ 160 Latest version models for each requested stage. If no stages argument is provided, 161 returns the latest version for each stage. (...) 166 :return: List of :py:class:mlflow.entities.model_registry.ModelVersion objects. 167 """ 168 req_body = message_to_json(GetLatestVersions(name=name, stages=stages)) --> 169 response_proto = self._call_endpoint(GetLatestVersions, req_body, call_all_endpoints=True) 170 return [ 171 ModelVersion.from_proto(model_version) 172 for model_version in response_proto.model_versions 173 ]

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/store/model_registry/base_rest_store.py:39, in BaseRestStore._call_endpoint(self, api, json_body, call_all_endpoints, extra_headers) 37 if call_all_endpoints: 38 endpoints = self._get_all_endpoints_from_method(api) ---> 39 return call_endpoints( 40 self.get_host_creds(), endpoints, json_body, response_proto, extra_headers 41 ) 42 else: 43 endpoint, method = self._get_endpoint_from_method(api)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:217, in call_endpoints(host_creds, endpoints, json_body, response_proto, extra_headers) 215 except RestException as e: 216 if e.error_code != ErrorCode.Name(ENDPOINT_NOT_FOUND) or i == len(endpoints) - 1: --> 217 raise e

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:212, in call_endpoints(host_creds, endpoints, json_body, response_proto, extra_headers) 210 for i, (endpoint, method) in enumerate(endpoints): 211 try: --> 212 return call_endpoint( 213 host_creds, endpoint, method, json_body, response_proto, extra_headers 214 ) 215 except RestException as e: 216 if e.error_code != ErrorCode.Name(ENDPOINT_NOT_FOUND) or i == len(endpoints) - 1:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:201, in call_endpoint(host_creds, endpoint, method, json_body, response_proto, extra_headers) 199 call_kwargs["json"] = json_body 200 response = http_request(**call_kwargs) --> 201 response = verify_rest_response(response, endpoint) 202 js_dict = json.loads(response.text) 203 parse_dict(js_dict=js_dict, message=response_proto)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-1810d376-b464-4920-b311-abfa3f12b7f8/lib/python3.9/site-packages/mlflow/utils/rest_utils.py:133, in verify_rest_response(response, endpoint) 131 if response.status_code != 200: 132 if _can_parse_as_json_object(response.text): --> 133 raise RestException(json.loads(response.text)) 134 else: 135 base_msg = "API request to endpoint {} failed with error code {} != 200".format( 136 endpoint, 137 response.status_code, 138 )

RestException: RESOURCE_DOES_NOT_EXIST: RegisteredModel 'dbdemos_customer_churn' does not exist. It might have been deleted.,None,Map(),Map(),List(),List(),Map()) at com.databricks.pipelines.execution.core.languages.PythonRepl.runCmd(PythonRepl.scala:335) at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$8(PipelineGraphLoader.scala:597) at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$8$adapted(PipelineGraphLoader.scala:595) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$7(PipelineGraphLoader.scala:595) at com.databricks.pipelines.execution.service.PipelineRunnable$.$anonfun$loadPythonGraph$7$adapted(PipelineGraphLoader.scala:572) at scala.collection.immutable.Map$Map1.foreach(Map.scala:193) at com.databricks.pipelines.execution.service.PipelineRunnable$.loadPythonGraph(PipelineGraphLoader.scala:572) at com.databricks.pipelines.execution.service.PipelineGraphLoader.loadGraph(PipelineGraphLoader.scala:324) at com.databricks.pipelines.execution.service.PipelineGraphLoader.loadGraph(PipelineGraphLoader.scala:205) at com.databricks.pipelines.execution.service.DLTComputeRunnableContext.loadGraph(DLTComputeRunnableContext.scala:96) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$initializeAndLoadGraph$1(UpdateExecution.scala:364) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$3(DeltaPipelinesUsageLogging.scala:118) at com.databricks.pipelines.common.monitoring.OperationStatusReporter.executeWithPeriodicReporting(OperationStatusReporter.scala:120) at com.databricks.pipelines.common.monitoring.OperationStatusReporter$.executeWithPeriodicReporting(OperationStatusReporter.scala:160) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$6(DeltaPipelinesUsageLogging.scala:137) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:555) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:650) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:671) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:412) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:410) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:407) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionContext(DeltaPipelinesUsageLogging.scala:25) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:455) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:440) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.withAttributionTags(DeltaPipelinesUsageLogging.scala:25) at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:645) at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:564) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperationWithResultTags(DeltaPipelinesUsageLogging.scala:25) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:555) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:525) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation(DeltaPipelinesUsageLogging.scala:25) at com.databricks.pipelines.execution.core.monitoring.PublicLogging.recordOperation0(DeltaPipelinesUsageLogging.scala:60) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.$anonfun$recordPipelinesOperation$1(DeltaPipelinesUsageLogging.scala:130) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation(DeltaPipelinesUsageLogging.scala:107) at com.databricks.pipelines.execution.core.monitoring.DeltaPipelinesUsageLogging.recordPipelinesOperation$(DeltaPipelinesUsageLogging.scala:102) at com.databricks.pipelines.execution.core.UpdateExecution.recordPipelinesOperation(UpdateExecution.scala:55) at com.databricks.pipelines.execution.core.UpdateExecution.executeStage(UpdateExecution.scala:257) at com.databricks.pipelines.execution.core.UpdateExecution.initializeAndLoadGraph(UpdateExecution.scala:360) at com.databricks.pipelines.execution.core.UpdateExecution.executeUpdate(UpdateExecution.scala:344) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$3(UpdateExecution.scala:126) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.WorkloadAttributionContextUtils$.runWithDLTWorkloadTags(WorkloadAttributionContextUtils_DBR_12_Minus.scala:6) at com.databricks.pipelines.execution.core.UpdateExecution.$anonfun$start$1(UpdateExecution.scala:122) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.UCContextCompanion$OptionUCContextHelper.runWithNewUCSIfAvailable(BaseUCContext.scala:283) at com.databricks.pipelines.execution.core.UpdateExecution.start(UpdateExecution.scala:119) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.$anonfun$run$2(ExecutionBackend.scala:670) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.pipelines.execution.core.CommandContextUtils$.withCommandContext(CommandContextUtils.scala:47) at com.databricks.pipelines.execution.service.ExecutionBackend$$anon$2.run(ExecutionBackend.scala:670) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:114) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:77) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:76) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:62) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:111) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:114) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

@QuentinAmbard - Please let me know if you could help with the mentioned issue, or with any questions if I can.

QuentinAmbard commented 11 months ago

hi @JavinDolia , Thanks, Could you add the following line in _resources/01-load-data notebook and let me know if it fixes the issue:

init_experiment_for_batch("lakehouse-retail-c360", "customer_churn_mock")

Example:

image
JavinDolia commented 11 months ago

Hi @QuentinAmbard Thanks,

No Luck! Still throwing the same old exception

Screenshot 2023-07-10 192656
QuentinAmbard commented 11 months ago

yeah I forgot to add the function it's not included here.

you also need to add this above in a cell:

#force the experiment to the field demos one. Required to launch as a batch
def init_experiment_for_batch(demo_name, experiment_name):
  #You can programatically get a PAT token with the following
  pat_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
  url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
  #current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
  import requests
  xp_root_path = f"/dbdemos/experiments/{demo_name}"
  requests.post(f"{url}/api/2.0/workspace/mkdirs", headers = {"Accept": "application/json", "Authorization": f"Bearer {pat_token}"}, json={ "path": xp_root_path})
  xp = f"{xp_root_path}/{experiment_name}"
  print(f"Using common experiment under {xp}")
  mlflow.set_experiment(xp)
  return mlflow.get_experiment_by_name(xp)

It's working for me with that. I'm releasing a new version with the fix in a few hours. If you could also test adding the function in the 02-create-churn-table notebook (at the beginning) and trying again it would help to make sure it fixes your issue too

QuentinAmbard commented 11 months ago

@JavinDolia I released a new version with the fix Can you try re-installing it and let me know?

JavinDolia commented 11 months ago

@QuentinAmbard - The code you sent above did fix the notebook exception - Thanks !

However, There's new issue in recent release - attaching image for reference Confusion matrix for validation dataset Code block

image image

Edited -

image
QuentinAmbard commented 11 months ago

I think this is an issue due to the fact that you tried to install the demo multiple times with the previous error, sorry about that

If you open the notebook 04.1-automl-churn-prediction and set force_refresh = True it should fix the issue 🤞

image

JavinDolia commented 11 months ago

Thanks @QuentinAmbard