feast-dev / feast

The Open Source Feature Store for Machine Learning
https://feast.dev
Apache License 2.0
5.61k stars 1k forks source link

Add feature transformation support to FeatureView #4277

Open franciscojavierarceo opened 5 months ago

franciscojavierarceo commented 5 months ago

Is your feature request related to a problem? Please describe. FeatureViews should support transformations as well.

Describe the solution you'd like

@batch_feature_view(
    sources=[credit_data_batch],
    entities=[user],
    mode="python",
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):
    return transactions[["user_id", "timestamp", "current_balance"]]

Describe alternatives you've considered N/A

Additional context Should behave similar to ODFV and Stream Feature Views and support Python and Pandas.

tokoko commented 4 months ago

@franciscojavierarceo Just to make sure we're on the same page, do you think we should have a different BatchFeatureView class for this just like odfvs and stream feature views, right?

Also, I think we should probably implement it w/o scheduled materialization at first, because it's not really obvious to me which feast service could do the scheduling for this (flight server seems most logical or maybe we need to add another service altogether).

franciscojavierarceo commented 4 months ago

I am starting to think having the decorator of feature view type is weird.

Instead we could just do something like:

@transform(
    sources=[credit_data_batch],
    entities=[user],
    mode="python",
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):
    return some_computation(transactions[["user_id", "timestamp", "current_balance"]])

And maybe in the decorator we have a way to map to when the computation is happening?

My view is that we need to provide clarity about when transformations happen (on demand, during a write, or in a stream--i.e., before a write) and I'm not sure the current way does this as obviously. Maybe it does.

franciscojavierarceo commented 3 months ago

This will be solved https://github.com/feast-dev/feast/issues/4376

franciscojavierarceo commented 3 months ago

Actually this is more general and does not handle the batch transformation use case so keeping this open. The ODFV writes do help for a subset of these items.

Vishnu-Rangiah commented 3 weeks ago

Chaining transformation into a dag which can be reused across FVs would be nice to have. This functionality would reduce code duplication across the feature repo. Here is an example adapted from tecton's transformation API: https://docs.tecton.ai/docs/defining-features/feature-views/transformations#a-feature-view-that-calls-a-pyspark-transformation-passing-two-pyspark-transformation-outputs

@transformation(mode="big_query")
def last_balance_time(transactions, max_user_transaction):
    return f"""SELECT t.user_id, t.current_balance, last_t.last_transaction_date as timestamp
                      FROM {transactions} t
                      INNER JOIN {max_user_transaction} last_t
                      ON t.user_id = last_t.user_id AND t.timestamp = last_t.last_transaction_date;"""

@transformation(mode="big_query")
def user_last_transaction_time(transactions):
    return f"""SELECT user_id, MAX(timestamp) AS last_transaction_date
                      FROM {transactions}
                      GROUP BY user_id"""

@feature_view(
    sources=[credit_data_batch],
    entities=[user],
    mode="pipeline", # creates a DAG from re-useable transformation functions
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):

    user_last_transaction_time = user_last_transaction_time(transactions)
    return last_balance_time(transactions, user_last_transaction_time)