lfbraz / Feathr-on-AzureML

In this repo, we demonstrate the use of Feathr SDK to create a Feature Store in an integrated Azure Machine Learning Environment using Azure Databricks as a Spark engine.
MIT License
1 stars 2 forks source link

client get_offline_features not working getting different errors #1

Open mansoorsyed11 opened 1 year ago

mansoorsyed11 commented 1 year ago

Hey Team @lfbraz , I'm using Feathr-on-AzureML script while I'm trying to get offline features as following ypur code below im getting below error.

My Code:

client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path, execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", "spark.feathr.outputFormat": "delta"})) client.wait_job_to_finish(timeout_sec=500)

My Error: TypeError: FeathrClient.get_offline_features() got an unexpected keyword argument 'execution_configuratons'

Error 2 : when removing code line of "execution_configuratons" I'm getting an error to change the file into Avro format and i changed into avro format I'm getting a new error of "nullpointerexception" can you please check about it thanks.

NullPointerException: at shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem.deleteWithInstrumentation(NativeAzureFileSystem.java:2104) at shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:1960) at shaded.databricks.org.apache.hadoop.fs.azure.NativeAzureFileSystem.delete(NativeAzureFileSystem.java:1933) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$delete$3(DatabricksFileSystemV2.scala:766) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.s3a.S3AExceptionUtils$.convertAWSExceptionToJavaIOException(DatabricksStreamUtils.scala:66) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$delete$2(DatabricksFileSystemV2.scala:764) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$withUserContextRecorded$2(DatabricksFileSystemV2.scala:1013) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionContext(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionTags(DatabricksFileSystemV2.scala:510) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withUserContextRecorded(DatabricksFileSystemV2.scala:986) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$delete$1(DatabricksFileSystemV2.scala:762) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionContext(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.withAttributionTags(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:479) at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:404) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.recordOperationWithResultTags(DatabricksFileSystemV2.scala:510) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:395) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:367) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.recordOperation(DatabricksFileSystemV2.scala:510) at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.delete(DatabricksFileSystemV2.scala:762) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.delete(DatabricksFileSystem.scala:142) at com.linkedin.feathr.offline.util.HdfsUtils$.deletePath(HdfsUtils.scala:428) at com.linkedin.feathr.offline.job.FeatureJoinJob$.prepareSparkSession(FeatureJoinJob.scala:371) at com.linkedin.feathr.offline.job.FeatureJoinJob$.main(FeatureJoinJob.scala:347) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command--1:1) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw$$iw$$iw$$iw$$iw.(command--1:43) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw$$iw$$iw$$iw.(command--1:45) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw$$iw$$iw.(command--1:47) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw$$iw.(command--1:49) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$$iw.(command--1:51) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read.(command--1:53) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$.(command--1:57) at $lineae3ee5c9a35e49839c7c8022585785eb25.$read$.(command--1) at $lineae3ee5c9a35e49839c7c8022585785eb25.$eval$.$print$lzycompute(:7) at $lineae3ee5c9a35e49839c7c8022585785eb25.$eval$.$print(:6) at $lineae3ee5c9a35e49839c7c8022585785eb25.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021) at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574) at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41) at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41) at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219) at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:235) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:908) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:861) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:235) at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:547) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:524) at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:611) at scala.util.Try$.apply(Try.scala:213) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:603) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:522) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:557) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221) at java.lang.Thread.run(Thread.java:750)


RuntimeError Traceback (most recent call last) Input In [10], in <cell line: 5>() 1 client.get_offline_features(observation_settings=settings, 2 feature_query=feature_query, 3 output_path=output_path) 4 #execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", "spark.feathr.outputFormat": "delta"})) ----> 5 client.wait_job_to_finish(timeout_sec=5000)

File /anaconda/envs/azureml_py310_sdkv2/lib/python3.10/site-packages/feathr/client.py:736, in FeathrClient.wait_job_to_finish(self, timeout_sec) 734 return 735 else: --> 736 raise RuntimeError('Spark job failed.')

RuntimeError: Spark job failed.

mansoorsyed11 commented 1 year ago

@lfbraz this project i used below code

import config

from feathr import FeathrClient client = FeathrClient(config_path='./config.yaml') import os

Service Principal credentials

os.environ['AZURE_CLIENT_ID'] = 'AZURE_CLIENT_ID' os.environ['AZURE_TENANT_ID'] = 'AZURE_TENANT_ID' os.environ['AZURE_CLIENT_SECRET'] = 'AZURE_CLIENT_SECRET'

Databricks Workspace

os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = 'DATABRICKS_WORKSPACE_TOKEN_VALUE'

ADLS credentials

os.environ['ADLS_ACCOUNT'] = 'strgmlopseus2dev01' os.environ['ADLS_KEY'] = 'ADLS_KEY'

REDIS password

os.environ['REDIS_PASSWORD'] = 'REDIS_PASSWORD'

from pyspark.sql import SparkSession, DataFrame from feathr import HdfsSource

batch_source = HdfsSource(name="Calls", path="dbfs: mnt/feathr-container/Calls/") #https://strgmlopseus2dev01.blob.core.windows.net/feathr-container/Customer/part-00000-181e3763-53d9-4073-8c15-809c9d732701-c000.snappy.parquet

from feathr import TypedKey, ValueType, Feature, FeatureAnchor, INT32, INPUT_CONTEXT

customer_id = TypedKey(key_column="CustomerId", key_column_type=ValueType.INT32, description="CustomerId", full_name="CustomerId")

features = [ Feature(name="f_NumberOfCalls", feature_type=INT32, key=customer_id, transform="NumberOfCalls"), Feature(name="f_AverageCallDuration", feature_type=INT32, key=customer_id, transform="AverageCallDuration"), ]

request_anchor = FeatureAnchor(name="request_features", source=batch_source, features=features)

client.build_features(anchor_list=[request_anchor])

from feathr import FeatureQuery, ObservationSettings from feathr import TypedKey, ValueType, INT32 from feathr import SparkExecutionConfiguration

output_path = 'dbfs:/mnt/feathr-container'

feature_query = FeatureQuery( feature_list=["f_NumberOfCalls", "f_AverageCallDuration"], key=customer_id)

settings = ObservationSettings( observation_path="dbfs:/mnt/feathr-container/Customer/")

client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path)

execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", "spark.feathr.outputFormat": "delta"}))

client.wait_job_to_finish(timeout_sec=5000)

mansoorsyed11 commented 1 year ago

This is My Yaml File api_version: 1 project_config: project_name: 'feathr_getting_started' required_environment_variables: