feast-dev / feast

The Open Source Feature Store for Machine Learning
https://feast.dev
Apache License 2.0
5.57k stars 996 forks source link

Is it possible run feast[spark] and put offline store on s3 ? #3571

Closed MinfangTao closed 1 year ago

MinfangTao commented 1 year ago

Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] I simple tried created a Aws emr, and install feast[spark] and try to point the datasource to a s3 location path=f"s3://my-bucket/spark_emr_test_data/driver_hourly_stats.parque

and get error org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"

We already have data in s3, and moving them into redshift will get extra cost ,so want to ask is it possible run feast[spark] and keep offline on S3 ?

Describe the solution you'd like A clear and concise description of what you want to happen.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

shuchu commented 1 year ago

Clarify: my reply is not a direct answer about solving the above issue. It's just something I'd like to share and wish it's helpful. For any serious issue, pls follow Feast's official document.

I met a similar error on a different application before. It's about the extra Java . Jars you need to allow the Spark to read/write to S3.

one tip: use "s3a://" instead of "s3://" while using Spark to access AWS S3 files.

Here is the feast's source code about finding/creating a PySpark session. https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

def get_spark_session_or_start_new_with_repoconfig(
    store_config: SparkOfflineStoreConfig,
) -> SparkSession:
    spark_session = SparkSession.getActiveSession()
    if not spark_session:
        spark_builder = SparkSession.builder
        spark_conf = store_config.spark_conf
        if spark_conf:
            spark_builder = spark_builder.config(
                conf=SparkConf().setAll([(k, v) for k, v in spark_conf.items()])
            )

        spark_session = spark_builder.getOrCreate()
    spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true")
    return spark_session

If I were you, I will try to play and set the right "store_config" value to allow the spark can read/write to S3.

I work on this problem in a different situation. Below is something that may help you:

1, you may want to ask the PySpark to install the necessary jars, for example: reference: https://mageswaran1989.medium.com/2022-no-filesystem-for-scheme-s3-cbd72c99a50c

def get_spark():
    spark = SparkSession.builder.master("local[4]").appName('SparkDelta') \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.jars.packages", 
                "io.delta:delta-core_2.12:1.1.0,"
                "org.apache.hadoop:hadoop-aws:3.2.2,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.180") \
        .getOrCreate()
return spark
  1. Configure the Spark Session. Some example code I googled from Stackflow for your reference.:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import configparser

sc = SparkContext.getOrCreate()
sql = SQLContext(sc)

hadoop_conf = sc._jsc.hadoopConfiguration()

config = configparser.ConfigParser()

config.read(os.path.expanduser("~/.aws/credentials"))

access_key = config.get("<aws-account>", "aws_access_key_id")
secret_key = config.get("<aws-account>", "aws_secret_access_key")
session_key = config.get("<aws-account>", "aws_session_token")

sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_key)
hadoop_conf.set("fs.s3a.secret.key", secret_key)
hadoop_conf.set("fs.s3a.session.token", session_key)

s3_path = "s3a://<s3-path>/"

sparkDF = sql.read.parquet(s3_path)
MinfangTao commented 1 year ago

@shuchu Thank you very much. I found other solution:

The problem is after using "pip3 install feast[spark]", it installed a pyspark, which can not identify "s3:". However, the emr has its own pyspark already, and it supported "s3:"

So my solution is let feast using the original pyspark .

  1. install python3.9 (The latest Emr carry python3.7, which is support the latest feast), https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python.html

  2. Find the original pyspark path : running pyspark and print its' path, echo "export PYTHONPATH=$PYTHONPATH:/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip:/usr/lib/spark/python/" >> /home/hadoop/.bash_profile

  3. install feast[spary]

  4. done

MinfangTao commented 1 year ago

After configure properly, feast support this function.