Azure / feast-azure

Azure plugins for Feast (FEAture STore)
MIT License
81 stars 52 forks source link

AzureNativeSystemFileStore StorageException (Public access in not permitted) #41

Open andrijaperovic opened 2 years ago

andrijaperovic commented 2 years ago

Running into the following issue on Azure Synapse Spark Pool when running "Historical-Retrieval":

22/01/20 09:09:18 ERROR AzureNativeFileSystemStore: Service returned StorageException when checking existence of container feasttest in account feastdrivingpoc.blob.core.windows.net
com.microsoft.azure.storage.StorageException: Public access is not permitted on this storage account.
    at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
    at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
    at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
    at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:769)
    at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:756)
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobContainerWrapperImpl.exists(StorageInterfaceImpl.java:233)

In stdout:

2022-01-20 09:09:18,463 [ERROR] An error occurred while calling o180.load.
: org.apache.hadoop.fs.azure.AzureException: org.apache.hadoop.fs.azure.AzureException: No credentials found for account feastdrivingpoc.blob.core.windows.net in the configuration, and its container feasttest is not accessible using anonymous credentials. Please check if the container exists first. If it is not publicly available, you have to provide account credentials.
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1123)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:566)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1423)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3316)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:137)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3365)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3333)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:492)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:377)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: No credentials found for account feastdrivingpoc.blob.core.windows.net in the configuration, and its container feasttest is not accessible using anonymous credentials. Please check if the container exists first. If it is not publicly available, you have to provide account credentials.
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:899)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1118)
    ... 26 more
Traceback (most recent call last):
  File "historical_feature_retrieval_job.py", line 896, in <module>
    start_job(
  File "historical_feature_retrieval_job.py", line 832, in start_job
    result = retrieve_historical_features(
  File "historical_feature_retrieval_job.py", line 784, in retrieve_historical_features
    entity_df = _read_and_verify_entity_df_from_source(spark, entity_source)
  File "historical_feature_retrieval_job.py", line 627, in _read_and_verify_entity_df_from_source
    spark.read.format(source.spark_format)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 204, in load
    return self._df(self._jreader.load(path))
  File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/trusted-service-user/cluster-env/env/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o180.load.

Public access in not permitted by security policy on the Blob storage account. Using the python notebook for reference: https://github.com/Azure/feast-azure/blob/main/cluster/samples/feature_store_azure.ipynb

andrijaperovic commented 2 years ago

@rramani is this something you might be able to provide some insight on? I've also reached out to bhgandhi@microsoftsupport.com for additional help. Looks like the account credentials need to be provided in the form of a spark setting. Currently I am setting FEAST_AZURE_BLOB_ACCOUNT_NAME and FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY as per the python notebook using os.environ, but I am not sure if that is sufficient in this scenario.

rramani commented 2 years ago

@andrijaperovic - you can add me to the thread (raramani@microsoft.com) or email me directly. Thanks

andrijaperovic commented 2 years ago

Thanks @rramani . Also wanted to mention, in k8s launcher there is a _get_azure_credentials method which returns the SparkSession config property based on account_key and account_name: https://github.com/Azure/feast-azure/blob/main/cluster/sdk/python/feast_spark/pyspark/launchers/k8s/k8s.py#L252

I'm wondering how this SparkSession config is being set in the case of synapse launcher? Since this helper method is absent in the synapse launcher implementation.

andrijaperovic commented 2 years ago

I see also that tenant_id and client_id are hardcoded in the SynapseJobLauncher: https://github.com/Azure/feast-azure/blob/a28ef51e16db87e740d265bdfe96539aa669e0a4/cluster/sdk/python/feast_spark/pyspark/launchers/synapse/synapse.py#L158

andrijaperovic commented 2 years ago

Looks like to mitigate this problem spark.hadoop.fs.azure.account.key.<storage-acccount-name>.blob.core.windows.net property needs to be set instead of fs.azure.account.key.<storage-acccount-name>.blob.core.windows.net. Any idea how to do this via feast? @rramani @samuel100

rramani commented 2 years ago

@andrijaperovic - This is being investigated by @snowmanmsft and we will get back to you. Thanks.

andrijaperovic commented 2 years ago

Thanks @rramani - I was able to circumvent this issue by setting configuration method argument in SparkBatchJobOptions constructor as part of create_spark_batch_job method in pyspark/launchers/synapse/synapse_utils.py to include python dictionary with spark.hadoop.fs.azure.account.key.<storage-acccount-name>.blob.core.windows.net.

I would assume these would be set as part of setting FEAST_AZURE_BLOB_ACCOUNT_NAME and FEAST_AZURE_BLOB_ACCOUNT_ACCESS_KEY? Unless you plan to introduce custom environment variables specific to synapse launcher. ++ @snowmanmsft

I am now facing an issue with read_parquet in the example when reading back the paruet files from FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION datalake directory:

Traceback (most recent call last):
  File "/test_get_features.py", line 104, in <module>
    read_parquet(output_file_uri)
  File "/test_get_features.py", line 46, in read_parquet
    data = file_client.download_file(0)
  File "/usr/local/lib/python3.8/site-packages/azure/storage/filedatalake/_data_lake_file_client.py", line 607, in download_file
    downloader = self._blob_client.download_blob(offset=offset, length=length, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 83, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_blob_client.py", line 848, in download_blob
    return StorageStreamDownloader(**options)
  File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_download.py", line 349, in __init__
    self._response = self._initial_request()
  File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_download.py", line 429, in _initial_request
    process_storage_error(error)
  File "/usr/local/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py", line 177, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
  File "<string>", line 1, in <module>
azure.core.exceptions.HttpResponseError: The range specified is invalid for the current size of the resource.
RequestId:7e4388cb-d01e-0044-2253-12c138000000
Time:2022-01-26T01:25:00.8929426Z
ErrorCode:InvalidRange

I've validated that there are in fact parquet files in this directory as the spark job is successful. Any pointers on how to proceed would be helpful. ++ @samuel100

andrijaperovic commented 2 years ago

I was finally able to circumvent the problem by doing file_system_client.get_paths() in read_parquet(uri) when parsed_uri.scheme is abfss, then filtering the paths for the output_file_uri path in order to read each individual path s parquet table, then finally doing a concat_tables:

def read_parquet(uri):
...
    elif parsed_uri.scheme == 'abfss':
        credential = ClientSecretCredential(os.getenv('AZURE_TENANT_ID'), os.getenv('AZURE_CLIENT_ID'), os.getenv('AZURE_CLIENT_SECRET'))
        #credential = DefaultAzureCredential()
        datalake = parsed_uri.netloc.split('@')
        service_client = DataLakeServiceClient(account_url="https://" + datalake[1], credential=credential)
        file_system_client = service_client.get_file_system_client(datalake[0])
        paths = file_system_client.get_paths()
        path_list = []
        for path in paths:
          path_list.append(path.name)
        r = re.compile(".*\/b86b1f37\-b719\-4df4-94b8\-e0ea84d1eb25\/part")
        filtered_list = list(filter(r.match, path_list))
        print(filtered_list)
        table_list = []
        for l in filtered_list:
          file_client = file_system_client.get_file_client(l)
          #file_client = file_system_client.get_file_client(parsed_uri.path)
          data = file_client.download_file(0)
          with io.BytesIO() as b:
              data.readinto(b)
              table = pq.read_table(b)
              table_list.append(table)
              #return table
        return table_list

Not sure if this is absolutely necessary, unless there is a way to glob the files with DataLakeServiceClient. To print the resulting pandas dataframe:

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None, 'display.max_columns', None)
print((pyarrow.concat_tables(iter(z))).to_pandas())

This is my resulting pandas df:

   driver_id     event_timestamp  driver_statistics__avg_daily_trips  \
0     433013 2022-04-14 04:08:08                                 NaN
1     385807 2022-10-21 15:18:20                                 NaN
2     609960 2022-01-21 11:39:38                                 NaN
3     423296 2022-10-10 22:23:40                                 NaN
4     335201 2022-03-19 18:03:45                                 NaN
5     584821 2022-07-26 01:37:02                                 NaN
6      13259 2022-05-29 14:56:44                                 NaN
7      25947 2022-06-06 23:44:01                                 NaN
8     888471 2022-05-12 06:52:57                                 NaN
9     453796 2022-07-02 13:06:23                                 NaN

   driver_statistics__acc_rate  driver_statistics__conv_rate  \
0                          NaN                           NaN
1                          NaN                           NaN
2                          NaN                           NaN
3                          NaN                           NaN
4                          NaN                           NaN
5                          NaN                           NaN
6                          NaN                           NaN
7                          NaN                           NaN
8                          NaN                           NaN
9                          NaN                           NaN

   driver_trips__trips_today
0                        NaN
1                        NaN
2                        NaN
3                        NaN
4                        NaN
5                        NaN
6                        NaN
7                        NaN
8                        NaN
9                        NaN

I'm assuming wasbsprotocol might be more practical here as a FEAST_HISTORICAL_FEATURE_OUTPUT_LOCATION. ++ @rramani @snowmanmsft

EDIT: Do I need to specify the data types for the numerical columns in the conversion to pandas df?

xiaoyongzhumsft commented 2 years ago

Taking a look at this issue, there are two folds:

  1. Reading parquet files, especially multiple files, isn't that straightforward. Adding a function to make it easier for customers to read multiple files.
  2. There's an issue where the joined result might be None. This is caused by how Feast calculates the joined features. Extend the Duration parameter in the configuration.

From Feast docs:

Feast joins the features to the entities based on the following conditions:

1. Entity primary key(s) value matches.
2. Feature event timestamp is the closest match possible to the entity event timestamp,
but must not be more recent than the entity event timestamp, and the difference must
not be greater than the maximum age specified in the feature table, unless the maximum age is not specified.
3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
most recent created timestamp will be chosen.
4. If none of the above conditions are satisfied, the feature rows will have null values.

Just committed a new fix on this: https://github.com/Azure/feast-azure/commit/413937d75fdbaec79579341690a4d9c868614bd1 Hope it can help.

andrijaperovic commented 2 years ago

Thanks @snowmanmsft ! It definitely helps. Regarding point 2., I was receiving NaN because I had generate_entities in two separate python files, one for feature ingestion and one for retrieving historical features. So entities were not consistent between the two stages.

After consolidating into a single file, I see rows which have valid numerical values for avg_daily_trips, conv_rate, acc_rate and trips today. Do you have any intention of packaging this read_parquet method as part of azure-feast-provider python package?

andrijaperovic commented 2 years ago

Looks like to mitigate this problem spark.hadoop.fs.azure.account.key.<storage-acccount-name>.blob.core.windows.net property needs to be set instead of fs.azure.account.key.<storage-acccount-name>.blob.core.windows.net. Any idea how to do this via feast? @rramani @samuel100

@snowmanmsft looks like this comment is still relevant. Should I work on a fix? It would be a change in synapse_utils.py.