yummyml / yummy

Apache License 2.0
33 stars 2 forks source link

Raised NotImplementedError when execute "feast apply" #13

Closed fura95 closed 1 year ago

fura95 commented 1 year ago

Hi, i'm using ParquetSource to get the file from hadoop cluster:

v_aggapp_card_source = ParquetSource(
    name="v_aggapp_card_source",
    path="hdfs://data/data_parquet/v_aggapp_card_source.parquet",
    timestamp_field="event_timestamp",
)

When i execute feast apply or use apply_total(repo_config, repo, True) I get the following error:

Traceback (most recent call last):
  File "/home/feast/feast_plugins/yummy/ryndin_example/feature_repo/feast_apply.py", line 9, in <module>
    apply_total(repo_config, repo, True)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/usage.py", line 276, in wrapper
    return func(*args, **kwargs)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/repo_operations.py", line 304, in apply_total
    project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/repo_operations.py", line 202, in _prepare_registry_and_repo
    repo = parse_repo(repo_path)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/repo_operations.py", line 119, in parse_repo
    module = importlib.import_module(module_path)
  File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/feast/feast_plugins/yummy/ryndin_example/feature_repo/feast_apply.py", line 9, in <module>
    apply_total(repo_config, repo, True)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/usage.py", line 276, in wrapper
    return func(*args, **kwargs)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/repo_operations.py", line 305, in apply_total
    apply_total_with_repo_instance(
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/repo_operations.py", line 265, in apply_total_with_repo_instance
    registry_diff, infra_diff, new_infra = store.plan(repo)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/usage.py", line 287, in wrapper
    raise exc.with_traceback(traceback)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/usage.py", line 276, in wrapper
    return func(*args, **kwargs)
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/feature_store.py", line 690, in plan
    self._make_inferences(
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/feature_store.py", line 569, in _make_inferences
    update_feature_views_with_inferred_features_and_entities(
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/inference.py", line 168, in update_feature_views_with_inferred_features_and_entities
    _infer_features_and_entities(
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/feast/inference.py", line 200, in _infer_features_and_entities
    table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
  File "/home/feast/feast_plugins/yummy/env/lib/python3.9/site-packages/yummy/sources/source.py", line 79, in get_table_column_names_and_types
    raise NotImplementedError

My feature_store.yaml:

project: large_foal
provider: local
registry:
    ...
offline_store:
    type: yummy.YummyOfflineStore
    backend: spark
    config:
        spark.master: "yarn"
        spark.ui.enabled: "true"
        spark.eventLog.enabled: "false"
        spark.sql.catalogImplementation: "hive"
        spark.sql.parser.quotedRegexColumnNames: "true"
        spark.sql.session.timeZone: "UTC"
online_store:
  ...
fura95 commented 1 year ago

Are there any examples how to get paquet file from hadoop?

fura95 commented 1 year ago

Also used DeltaSource and has the same issue:

v_aggapp_card_source = DeltaSource(
    name="v_aggapp_card_source",
    path="/data/data_parquet/v_aggapp_card_source.parquet",
    timestamp_field="event_timestamp",
)
qooba commented 1 year ago

@fura95 - I have to add implementation of get_table_column_names_and_types which is used by feast apply for schema inference if schema is not specified for feature view. You can specify the schema as an workaround.

fura95 commented 1 year ago

@qooba I specified schema for feature views, but that didnt fix the problem:

# Feature Views
v_aggapp_card_fv = FeatureView(
    name="v_aggapp_card",
    entities=[v_aggapp_card_entity],
    ttl=timedelta(weeks=52),
    schema=[
        Field(name="cnt_mcc_br5_cat4_6", dtype=Int64),
    ],
    source=v_aggapp_card_source,
    tags={"test_tag": "cards"}
)
v_aggapp_credit_fv = FeatureView(
    name="v_aggapp_credit",
    entities=[v_aggapp_credit_entity],
    ttl=timedelta(weeks=52),
    schema=[
        Field(name="loan_age_mortg_min", dtype=Int64),
        Field(name="delinq_share_30p_ext_lifo", dtype=Int64),
        Field(name="length_ext", dtype=Int64),
        Field(name="max_util_card_act", dtype=Int64),
        Field(name="pmt_delays_1_29_24m_sum_mnth_lifo", dtype=Int64),
    ],
    source=v_aggapp_credit_source,
    tags={"test_tag": "credits"}
)
qooba commented 1 year ago

@fura95 - I will try to reproduce. Can you send me feast version which you use ?

fura95 commented 1 year ago

@qooba
feast==0.22.4 Try create FeatureView without describing Entity as a Field in schema:

my_entity = Entity(name="entity_id", description="entity id",)

mystats_view_parquet = FeatureView(
    name="my_statistics_parquet",
    entities=[my_entity],
    ttl=timedelta(seconds=3600*24*20),
    schema=[
        #Field(name="entity_id", dtype=Float32),
        Field(name="p0", dtype=Float32),
        Field(name="p1", dtype=Float32),
        Field(name="p2", dtype=Float32),
        Field(name="p3", dtype=Float32),
        Field(name="p4", dtype=Float32),
        Field(name="p5", dtype=Float32),
        Field(name="p6", dtype=Float32),
        Field(name="p7", dtype=Float32),
        Field(name="p8", dtype=Float32),
        Field(name="p9", dtype=Float32),
        Field(name="y", dtype=Float32),
    ], online=True, source=my_stats_parquet, tags={},)
qooba commented 1 year ago

@fura95 - thanks a lot for this hint :) now I'm able to reproduce. It seems that I have to implement get_table_column_names_and_types for all sources. Temporary workaround would be specifying the whole schema (with entities).