feast-dev / feast

The Open Source Feature Store for Machine Learning
https://feast.dev
Apache License 2.0
5.48k stars 978 forks source link

OnDemandFeatureViews with RequestSource returns different columns depending on online/offline feature retrieval. #4479

Open job-almekinders opened 1 week ago

job-almekinders commented 1 week ago

Context

I'm using an ODFV with a PandasTransformation and a RequestSource input. Furthermore, I'm using Postgres offline and online store.

Expected Behavior

I would expect both the get_online_features_async method and the get_historical_features method to return the same set of features. However, when retrieving features with get_online_features_async, the input data from the RequestSource is not present in the output response. On the other hand, when retrieving features with get_historical_features, the input data from the RequestSource is present in the output response.

I'm not sure which behavior is to be expected. However, I think think that both approaches should return the same columns.

Current Behavior

When calling the get_online_features_async method I do not see the input from the RequestSource back in the output response, while I would expect it to be there.

One could also argue that it should not be in the output response. That would also be an option. However, I would assume that the online and offline feature retrieval would return the same output in terms of columns.

Steps to reproduce

docker-compose.yml

---
version: "3"
services:
  offline_store:
    image: postgres:16-alpine
    container_name: offline_store
    ports:
      - "6543:5432"
    environment:
      - POSTGRES_DB=offline_store
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    volumes:
      - ./postgres_init:/docker-entrypoint-initdb.d
  online_store:
    image: postgres:16-alpine
    container_name: online_store
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=online_store
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

feature_store.yml

project: feast_tryout
provider: local
registry:
  registry_type: sql
  path: postgresql+psycopg2://postgres:postgres@0.0.0.0:5432/online_store
  cache_ttl_seconds: 60
online_store:
  type: postgres
  host: 0.0.0.0
  port: 5432
  database: online_store
  db_schema: online
  user: postgres
  password: postgres
offline_store:
  type: postgres
  host: 0.0.0.0
  port: 6543
  database: offline_store
  db_schema: offline
  user: postgres
  password: postgres
entity_key_serialization_version: 2

Insert into offline store (postgres) postgres_init/create-offline-store-database.sql

CREATE SCHEMA offline;

CREATE TABLE offline.features (
  "ENTITY_ID"         VARCHAR,
  "EVENT_TIMESTAMP" TIMESTAMP,
  "ENTITY_FLOAT"      FLOAT,
);

INSERT INTO offline.features
SELECT *
FROM (
  VALUES ('11111111', '2024-01-01 13:00:00' :: TIMESTAMP, 1.1),
         ('11111111', '2024-01-01 14:00:00' :: TIMESTAMP, 1.11),
         ('11111111', '2024-01-01 15:00:00' :: TIMESTAMP, 1.111),
         ('22222222', '2024-01-01 13:00:00' :: TIMESTAMP, 2.2),
         ('22222222', '2024-01-01 14:00:00' :: TIMESTAMP, 2.22),
         ('33333333', '2024-01-01 13:00:00' :: TIMESTAMP, 3.3),
         ('44444444', '2024-01-02 22:00:00' :: TIMESTAMP, 4.4)
  )

bootstrap.py

from datetime import timedelta
from typing import Any

import pandas as pd
from feast import (
    Entity,
    FeatureService,
    FeatureStore,
    FeatureView,
    Field,
    RequestSource,
    ValueType,
)
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
    PostgreSQLSource as PostgresSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64

feature_store = FeatureStore()

features_entity = Entity(
    name="entity_id",
    join_keys=["ENTITY_ID"],
    value_type=ValueType.STRING,
)

features_source = PostgresSource(
    name="features",
    timestamp_field="EVENT_TIMESTAMP",
    table="offline.features",
)

features_feature_view = FeatureView(
    name="features_feature_view",
    entities=[features_entity],
    ttl=timedelta(days=0),
    schema=[Field(name="ENTITY_FLOAT", dtype=Float32)],
    online=True,
    source=features_source,
)

request_source = RequestSource(
    name="request_feature",
    schema=[Field(name="REQUEST_FLOAT", dtype=Float32)],
)

@on_demand_feature_view(
    sources=[features_feature_view, request_source],
    schema=[
        Field(name="ENTITY_FLOAT_TRANSFORMED_PANDAS", dtype=Float64),
        Field(name="ENTITY_FLOAT_PLUS_REQUEST_SOURCE", dtype=Float64),
    ],
    mode="pandas",
)
def odfv_pandas(input: pd.DataFrame) -> pd.DataFrame:
    output = pd.DataFrame()
    output["ENTITY_FLOAT_TRANSFORMED_PANDAS"] = input["ENTITY_FLOAT"] * 2
    output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE"] = (
        input["ENTITY_FLOAT"] * input["REQUEST_FLOAT"]
    )
    return output

@on_demand_feature_view(
    sources=[features_feature_view, request_source],
    schema=[Field(name="ENTITY_FLOAT_TRANSFORMED_PYTHON", dtype=Float64)],
    mode="python",
)
def odfv_python(input: dict[str, Any]) -> dict[str, Any]:
    output = {}
    output["ENTITY_FLOAT_TRANSFORMED_PYTHON"] = [
        value * 2 for value in input["ENTITY_FLOAT"]
    ]
    output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE_PYTHON"] = [
        e + r for e, r in zip(input["ENTITY_FLOAT"], input["REQUEST_FLOAT"])
    ]

    return output

features_feature_service_pandas = FeatureService(
    name="features_feature_service_pandas",
    features=[features_feature_view, odfv_pandas],
)

features_feature_service_python = FeatureService(
    name="features_feature_service_python",
    features=[features_feature_view, odfv_python],
)

feature_store.apply(
    [
        features_entity,
        features_source,
        features_feature_view,
        odfv_pandas,
        odfv_python,
        features_feature_service_pandas,
        features_feature_service_python,
    ]
)

materialize.py

from datetime import datetime

from feast import FeatureStore

feature_store = FeatureStore()
feature_store.materialize(
    start_date=datetime(1900, 1, 1),
    end_date=datetime(9999, 1, 1),
    feature_views=["features_feature_view"],
)

inference.py

"""Inference example."""

import pandas as pd
from feast import FeatureStore

feature_store = FeatureStore()
feature_service_pandas = feature_store.get_feature_service(
    name="features_feature_service_pandas"
)
feature_service_python = feature_store.get_feature_service(
    name="features_feature_service_python"
)

entity_rows = [
    {"ENTITY_ID": "11111111", "REQUEST_FLOAT": 1.0},
    {"ENTITY_ID": "22222222", "REQUEST_FLOAT": 1.0},
]
entity_df = pd.DataFrame(entity_rows)
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

print("offline with pandas")
offline_features = feature_store.get_historical_features(
    entity_df=entity_df,
    features=feature_service_pandas,
).to_df()
print(list(offline_features.to_dict().keys()))

print("online with pandas")
online_features = feature_store.get_online_features(
    entity_rows=entity_rows,
    features=feature_service_pandas,
).to_dict()
print(list(online_features.keys()))

print("online with python")
online_features = feature_store.get_online_features(
    entity_rows=entity_rows,
    features=feature_service_python,
).to_dict()
print(list(online_features.keys()))

## OUTPUT:
# offline with pandas
# ['ENTITY_ID', 'REQUEST_FLOAT', 'event_timestamp', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PANDAS', 'ENTITY_FLOAT_PLUS_REQUEST_SOURCE']
# online with pandas
# ['ENTITY_ID', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PANDAS', 'ENTITY_FLOAT_PLUS_REQUEST_SOURCE']
# online with python
# ['ENTITY_ID', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PYTHON']

# not possible to do offline transformation with python mode.

Specifications

Version: 0.36.0 Platform: macOS - M1 Subsystem: Sonoma 14.1.1

Possible Solution

franciscojavierarceo commented 1 week ago

Oh this is not ideal. Can you post your client usage? Just to make sure I understand it.

I don't think it should be included unless explicitly requested though, so probably the get_historical_features is the one that's wrong.

job-almekinders commented 1 week ago

@franciscojavierarceo Yes, I've just posted a minimal example in the description!

As you can see, the offline transformation with pandas returns the additional columns: 'REQUEST_FLOAT', 'event_timestamp'