feast-dev / feast

The Open Source Feature Store for Machine Learning
https://feast.dev
Apache License 2.0
5.41k stars 962 forks source link

Feast API: Adding a new historical store #482

Closed woop closed 3 years ago

woop commented 4 years ago

1. Introduction

We've had a lot of demand for either open source or AWS batch stores (#367, #259). Folks from the community have asked us how they can contribute code to add their stores types.

In this issue I will walk through how batch stores are currently being used and how a new batch store type can be added.

2. Overview

Feast interacts with a batch store in two places

3. Data ingestion

Feast creates and manages population jobs that stream in data from upstream data sources. Currently Feast only supports Kafka as a data source, meaning these jobs are all long running. Batch ingestion pushes data to Kafka topics after which they are picked up by these "population" jobs.

In order for the ingestion + population flow to complete, the destination store must be writable. This means that Feast must be able to create the appropriate tables/schemas in the store and also write data from the population job into the store.

Currently Feast Core starts and manages these population jobs that ingest data into stores, although we are planning to move this responsibility to the serving layer. Feast Core starts an Apache Beam job which synchronously runs migrations on the destination store and subsequently starts consuming from Kafka and publishing records.

Below is a "happy-path" example of a batch ingestion process: Untitled (1)

In order to accommodate a new store type, the Apache Beam job needs to be updated to support

4. Feature serving (batch)

Feast Serving is a web service that allows for the retrieval of feature data from a batch feature store. Below is a sequence diagram for a typical feature request from a batch store.

Untitled

Currently we only have support for BigQuery has a batch store. The entry point for this implementation is the BigQueryServingService, which extends the ServingService interface.

public interface ServingService {
  GetFeastServingInfoResponse getFeastServingInfo(GetFeastServingInfoRequest getFeastServingInfoRequest);
  GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest getFeaturesRequest);
  GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest);
  GetJobResponse getJob(GetJobRequest getJobRequest);
}

The ServingService is called from the wrapping gRPC service ServingService, where the functionality is more clearly described.

The interface defines the following methods

Notes on the current design: Although the actual functionality will be retained, the structure of these interfaces will probably change away from extending a service interface and towards having a store interface. There are various problems with the current implementation

  1. Batch and online stores share a single interface. I believe the intention here was to allow some stores to support both online and historical/batch storage, but for most stores this isn't the case. There is also no reason why we can't have two interfaces here. Ideally this should be split in two.
  2. The current approach is to extend services for each new store type, but this seems to be a poor abstractions. Ideally we would have both a batch and online store interface (not service interface), which is called from a single serving implementation. This approach would be a clearer separation of concerns and would prevent things like job management happening within a service implementation.
nfplay commented 4 years ago

Hi,

I see that feature serving presupposes the download of data to a staging location. Is this by design? How does this scale to terabyte and petabyte scale? Shouldn't training be allowed directly over the batch store?

Thank you! NF

woop commented 4 years ago

Hi,

I see that feature serving presupposes the download of data to a staging location. Is this by design? How does this scale to terabyte and petabyte scale? Shouldn't training be allowed directly over the batch store?

Thank you! NF

Well that depends. The reason we implemented that is exactly so that we could process larger amounts of data. In the case of BigQuery, if you directly consume through its APIs it is (or used to be) slower than exporting from BigQuery to GCS and then downloading from GCS (staging location).

Right now the staging location is a convenient technical debt, but I think its logical to expose an API that allows for direct retrieval (possibly using Apache Arrow Flight or something similar).

nfplay commented 4 years ago

When you talk about direct retrieval are you talking about retrieval of the dataset from the store directly to the client (without a middle staging area)?

What do you think about skipping that step and training directly over the store (without any download)? Like you can do with Spark consuming data directly from a Data/Delta Lake and training in a distributed fashion (as you can do in Databricks for instance) Does this make sense?

Additionally, does creating a Feature Store over a Data Lake make sense to you?

Thank you!

ches commented 4 years ago

In the case of BigQuery, if you directly consume through its APIs it is (or used to be) slower than exporting from BigQuery to GCS and then downloading from GCS (staging location).

We're limited though, at least in what Feast SDKs currently offer, to a data set that fits in memory on the machine where you load that export into pandas. Useful for development, limiting for taking a model to production scale training or batch inference.

@nfplay FWIW we're doing exactly what you're asking—Spark is de rigueur for training beyond prototyping in my organization, so we're implementing an extended Feast SDK with offline retrieval to Spark dataframes where the Python SDK gives pandas. We sidestep the RPC service layer, which encodes knowledge of the warehouse data model in the SDK and may preclude future Feast access control integration, but these tradeoffs are more than acceptable for our needs.

This is a work in progress as we speak, I'm hopeful we can release something functional as open source in the next month. This will be Scala initially, PySpark could come along much later or would be a welcome contribution.

This SDK is based on Hive storage though, not sure if that would apply to you, and it will likely take us longer to get that into mainline Feast. (In theory it could support any storage Spark can, if the data model matches).

Right now the staging location is a convenient technical debt, but I think its logical to expose an API that allows for direct retrieval (possibly using Apache Arrow Flight or something similar).

Not to say this wouldn't be beneficial for some (especially since the implementation coupled to GCS isn't going to make sense for a new batch store), but speaking from perspective of a stack where compute can have data locality, I'm not convinced that we will see data moved over the network beating that anytime soon for us, despite what some big players who are doing it are reporting :bowtie: Unless new layers come into the picture that is, like Alluxio or MinIO with its Select API.

If Feast wishes to be agnostic to the data locality and processing engine questions, though, a possibility that vaguely crossed my mind was similarly providing offline retrieval SDK with a Beam PCollection API, which users could use in their own Beam jobs.

  1. Batch and online stores share a single interface. I believe the intention here was to allow some stores to support both online and historical/batch storage, but for most stores this isn't the case. There is also no reason why we can't have two interfaces here. Ideally this should be split in two.
  2. The current approach is to extend services for each new store type, but this seems to be a poor abstractions. Ideally we would have both a batch and online store interface (not service interface), which is called from a single serving implementation. This approach would be a clearer separation of concerns and would prevent things like job management happening within a service implementation.

These are important points for design of batch implementation for Ingestion and Serving that I hope we'll discuss here, and while this all has some implications for SDK too as above, I don't want to divert this issue thread to SDK any more than I already have… Perhaps we should open a new issue for SDK outlook? Java and Go don't have offline support, that's perhaps an open matter already.

nfplay commented 4 years ago

We're limited though, at least in what Feast SDKs currently offer, to a data set that fits in memory on the machine where you load that export into pandas. Useful for development, limiting for taking a model to production scale training or batch inference.

This is exactly my point.

FWIW we're doing exactly what you're asking—Spark is de rigueur for training beyond prototyping in my organization, so we're implementing an extended Feast SDK with offline retrieval to Spark dataframes where the Python SDK gives pandas. We sidestep the RPC service layer, which encodes knowledge of the warehouse data model in the SDK and may preclude future Feast access control integration, but these tradeoffs are more than acceptable for our needs.

@ches I would love to know more about how you're envisioning this solution. We're exploring Databricks (that is, Spark) plus Delta Lake over Azure Data Lake Gen2 for storage, but Hive is just another alternative.

woop commented 4 years ago

In the case of BigQuery, if you directly consume through its APIs it is (or used to be) slower than exporting from BigQuery to GCS and then downloading from GCS (staging location).

We're limited though, at least in what Feast SDKs currently offer, to a data set that fits in memory on the machine where you load that export into pandas. Useful for development, limiting for taking a model to production scale training or batch inference.

The Feast Serving API produces .avro files in a bucket. The Python SDK is a wrapper ontop of that. Loading the avro files into memory as a dataframe is a convenient way to access the data in a bucket, but downloading the files is also an option if the training datasets are large API.

That being said, I would love to improve the batch retrieval API, especially in terms of exported types and how the data is produced in the first place.

@nfplay FWIW we're doing exactly what you're asking—Spark is de rigueur for training beyond prototyping in my organization, so we're implementing an extended Feast SDK with offline retrieval to Spark dataframes where the Python SDK gives pandas. We sidestep the RPC service layer, which encodes knowledge of the warehouse data model in the SDK and may preclude future Feast access control integration, but these tradeoffs are more than acceptable for our needs.

This is a work in progress as we speak, I'm hopeful we can release something functional as open source in the next month. This will be Scala initially, PySpark could come along much later or would be a welcome contribution.

This sounds awesome, cant wait to see what you have in mind!

This SDK is based on Hive storage though, not sure if that would apply to you, and it will likely take us longer to get that into mainline Feast. (In theory it could support any storage Spark can, if the data model matches).

Right now the staging location is a convenient technical debt, but I think its logical to expose an API that allows for direct retrieval (possibly using Apache Arrow Flight or something similar).

Not to say this wouldn't be beneficial for some (especially since the implementation coupled to GCS isn't going to make sense for a new batch store), but speaking from perspective of a stack where compute can have data locality, I'm not convinced that we will see data moved over the network beating that anytime soon for us, despite what some big players who are doing it are reporting :bowtie: Unless new layers come into the picture that is, like Alluxio or MinIO with its Select API.

If Feast wishes to be agnostic to the data locality and processing engine questions, though, a possibility that vaguely crossed my mind was similarly providing offline retrieval SDK with a Beam PCollection API, which users could use in their own Beam jobs.

  1. Batch and online stores share a single interface. I believe the intention here was to allow some stores to support both online and historical/batch storage, but for most stores this isn't the case. There is also no reason why we can't have two interfaces here. Ideally this should be split in two.
  2. The current approach is to extend services for each new store type, but this seems to be a poor abstractions. Ideally we would have both a batch and online store interface (not service interface), which is called from a single serving implementation. This approach would be a clearer separation of concerns and would prevent things like job management happening within a service implementation.

These are important points for design of batch implementation for Ingestion and Serving that I hope we'll discuss here, and while this all has some implications for SDK too as above, I don't want to divert this issue thread to SDK any more than I already have… Perhaps we should open a new issue for SDK outlook? Java and Go don't have offline support, that's perhaps an open matter already.

There are aspects of the SDKs that have to be discussed here if its in the context of adding a new store type.

Are you looking for a broader SDK discussion? I can create one if that helps us narrow down next steps with regards to feature and entity discovery (which we intend to focus on a bit more for 0.6)

woop commented 4 years ago

When you talk about direct retrieval are you talking about retrieval of the dataset from the store directly to the client (without a middle staging area)?

Without a staging area.

What do you think about skipping that step and training directly over the store (without any download)? Like you can do with Spark consuming data directly from a Data/Delta Lake and training in a distributed fashion (as you can do in Databricks for instance) Does this make sense?

Ideally that layer would be Feast. The question is mostly around where execution of training happens. For the time being we are assuming data movement towards where training happens. The reverse could happen in the future, but I dont see this as our biggest issue right now.

Additionally, does creating a Feature Store over a Data Lake make sense to you?

I think it makes a lot of sense to be honest. We are doing that internally. The parts that I think should be improved are how we are exposing data from the data lake to users of Feast.

groodt commented 4 years ago

👋 Another person throwing their AWS interest into the ring.

I know you're not looking into alternative stores until at least Feast 0.6.

I wonder if a relatively sensible stack for AWS might be:

I understand you already support Kafka and Redis.

I don't exactly know if the other technologies are entirely suitable for the Feast use-cases though.

It is possible to stream with reasonably low-latency into Delta tables with Spark Structured Streaming (Long running EMR cluster).

I think a similar pattern to what you do with GCS where a batch API could export Delta datasets would work as well. It would probably require either another long-running EMR cluster purely for the batch exports, or dynamically starting and stopping an EMR cluster for every job or potentially running it as an AWS Glue job. Smaller installations could probably share the single long running EMR cluster.

ches commented 4 years ago

Note: some of the conversation here relates to #444, interested parties may want to subscribe/comment there.

@groodt There is a reference on #444 also to #362 about Spark. FWIW as of today the most mature, functional Spark runner for Beam is one based on the older DStream API rather than Structured Streaming. Until the newer runner is more complete, that may complicate integration with Delta Lake for anyone looking down that path.

nfplay commented 4 years ago

@woop , @ches et all:

One thing that is not clear to me about the ingestion: when using client.ingest Feast API where you pass a Dataframe, you're effectively ingesting only the data that is on the specific Dataframe.

Let's say that feature engineering occurred over known entities - lets assume Customer and Transactions - in a certain project, and we want to upload new features - based on these 2 entities - that were built on this project to Feast Store. We can do that using a Dataframe that was engineered in the scope of the project, but this Dataframe is static. We want that the feature engineering logic be applied recurrently in an online fashion, as soon as new Customer and Transactions are available, so as to make features available as freshly as possible.

My question is, every call to client.ingest being static, we need to create a workflow - using airflow or something similar, - to apply feature engineering logic to base entities - here, Customer and Transaction - and then call client.ingest periodically to keep features updated. Otherwise, a simple client.ingest python invocation will not be sufficient for feature engineering over base data that is always changing.

Am I thinking straight? How do we create these workflows and keep features updated in batch store?

Thank you!

woop commented 4 years ago

Am I thinking straight? How do we create these workflows and keep features updated in batch store?

Your understanding is accurate. I do think we will implement feature transformations and I hope external data sources eventually, but the flow you have described is how we currently do it. For batch sources we have jobs that process features and ingest into Feast (where we don't care whether they are out of date by 1 day or so). For features that we need to keep highly up to date, we use stream processors like Flink/Beam to create FeatureRows and push them to Kafka topics. Feast can consume from Kafka so this works well for us right now.

nfplay commented 4 years ago

Your understanding is accurate. I do think we will implement feature transformations and I hope external data sources eventually, but the flow you have described is how we currently do it. For batch sources we have jobs that process features and ingest into Feast (where we don't care whether they are out of date by 1 day or so). For features that we need to keep highly up to date, we use stream processors like Flink/Beam to create FeatureRows and push them to Kafka topics. Feast can consume from Kafka so this works well for us right now.

In this case, how does a Data Scientist participate on the creation of these jobs? If for instance you have Airflow jobs that apply feature engineering logic to base entities, how do you govern the creation of these jobs? Are they created by data engineers - experts in data sources - or by data scientists - experts in the feature engineering logic - or by both?

woop commented 4 years ago

Your understanding is accurate. I do think we will implement feature transformations and I hope external data sources eventually, but the flow you have described is how we currently do it. For batch sources we have jobs that process features and ingest into Feast (where we don't care whether they are out of date by 1 day or so). For features that we need to keep highly up to date, we use stream processors like Flink/Beam to create FeatureRows and push them to Kafka topics. Feast can consume from Kafka so this works well for us right now.

In this case, how does a Data Scientist participate on the creation of these jobs? If for instance you have Airflow jobs that apply feature engineering logic to base entities, how do you govern the creation of these jobs? Are they created by data engineers - experts in data sources - or by data scientists - experts in the feature engineering logic - or by both?

In our case we have data scientists and data engineers developing these pipelines, but we try and abstract away as much of the infra/boilerplate away from this process as possible. So mostly they are querying data from a data warehouse or lake, processing it in the pipeline, then publishing it to Feast.

I do think that we are straying from the topic of this issue though, so maybe we should create a new issue?

nfplay commented 4 years ago

Thank you for your answers! For me it's clearer now. A version of Feast on Azure is something i'm currently looking into. The stack could be very similar to @groodt 's:

Kafka (MSK) Spark Structured Streaming (EMR) - Processing Delta Lake over ADLGen2 (Open Source, delta.io) - Batch Serving Redis HA (ElastiCache) - Online Serving

Don't know whether Delta Lake is the most appropriate tecnology, but i'm considering trying to do training directly over Feast, without dataset export: for instance, making the getfeature API return a Spark Dataframe, something inline with @ches's scenario. If anyone is looking at this use case, please let me know!

Thanks

Yanson commented 4 years ago

@nfplay I am looking at Azure support, please keep in touch.

Jeffwan commented 4 years ago

I will implement AWS support based on this. Looking forward to the new API

Jeffwan commented 4 years ago

@woop Seems there's no pending tasks for this story? Any other changes needed? If not, we can add new stores implementations?

woop commented 4 years ago

@woop Seems there's no pending tasks for this story? Any other changes needed? If not, we can add new stores implementations?

I think the biggest part is done, that being #567. I don't expect any major changes going forward, but we might still make some small tweaks or extensions to the storage API, especially as it relates to configuration (relevant #525).

Adding a store should now be possible.

woop commented 4 years ago

Added documentation to make store addition a bit clearer: https://docs.feast.dev/contributing/adding-new-storage-connectors

dr3s commented 4 years ago

@woop just catching up on your design. Is there a reason that the store didn't utilize PCollection or other Beam API and made the staging aspect orthogonal to the store API?

woop commented 4 years ago

Hey @dr3s,

Is there a reason that the store didn't utilize PCollection or other Beam API.

It's not clear what you mean by this. The storage API uses PCollections

and made the staging aspect orthogonal to the store API?

What do you mean by orthogonal? The staging is a part of both the GetBatchFeatures RPC and the storage api

There are questions as to whether we need to have the staging location in the first place. This is very specific to our BigQuery implementation. I'm happy to relook at these interfaces once more use cases come up or we start adding new stores.

Would you mind shining some light on the storage type that you would like to implement?

dr3s commented 4 years ago

Sorry @woop. I wasn't looking at the sink. I guess I need to spend more time with it before asking dumb questions.

I was looking at the retriever APIs. I guess it's not clear to me how returning the staging locations in the result is the most extensible for storage implementations. What I meant about staging being orthogonal is that I could see many storage implementations providing a retrieval job api that returns a PCollection, which Feast then makes available via a staging location. I'm not sure that is feasible though. I imagine that there are optimizations in BigQuery to stage data without exposing some stream to the data.

In fact, I'm looking at a Snowflake batch store and it also has an efficient staging to S3 pipeline. I think it probably lines up fairly cleanly with the BigQuery implementation. The first decision I need to make is how much to just stick with plain JDBC vs using the proprietary Snowflake functionality. I expect that I will do the latter to take advantage of the efficient staging of data like has been done with BigQuery.

The use of staging at all seems like an implementation detail that shouldn't be in the API though. That's sort of where my question came from. I haven't digested enough of the client code to grok how it's retrieved but should the client care if the data is finally being streamed from a staging location or the store directly?

All of the above is focused on batch. I don't know how it applies to the online retrieval which should be probably more of a request/response pattern with a smallish set of data. One note though about the online retrieval API, it might be nice to have a proper response type for the return of the retrieval method to make it more extensible in the future.

dr3s commented 4 years ago

Just one other thought after reading the docs you wrote on retrieval. If I understand correctly, the staging bucket is basically the method of data transfer between client and server. Can beam (or some other API) help to abstract away this interaction, so it's less coupled to buckets or other specific implementation?

Maybe the staging area could be a Sink that is passed to the retriever to write retrieved results to in its implementation-specific way?

woop commented 4 years ago

The use of staging at all seems like an implementation detail that shouldn't be in the API though

I agree on this point. It's an area of tech debt that we could factor out.

I haven't digested enough of the client code to grok how it's retrieved but should the client care if the data is finally being streamed from a staging location or the store directly?

I dont think the client should care, but the question is how do you want to use the Feast client abstraction. I can see three methods here.

  1. Object store reference is returned
  2. Table reference is returned
  3. SQL query is returned

We have (1), but possibly we can have (2) or (3) as options. (3) would make it easy for a lazy system (Spark) to operate on the data, and (2) is quite generic and doesn't require a staging location. The user can stream the data out however they see fit.

woop commented 4 years ago

it might be nice to have a proper response type for the return of the retrieval method to make it more extensible in the future.

We went back and forth on this point during the API definition. I think there is a valid argument for a proper response type, but it would have to be driven by a use case I think. Happy to make that change in the future.

Can beam (or some other API) help to abstract away this interaction, so it's less coupled to buckets or other specific implementation?

I kind of like this idea, but I am not sure about the ways in which it would fail. We would be extending the system boundaries of Feast. I think for the time being I would just want to provide an escape hatch, a means for clients to be able to retrieve data easily and use it, and an easy way to add new stores. Over time as access patterns emerge we can think of nicer ways to retrieve data.

Plus I like the idea of the user interacting directly with stores when working with batch data, as opposed to through an API. Feast's API should just be a small part of the workflow. The more we impose there from the Feast side the more likely we are to frustrate users I think.

kennydataml commented 4 years ago

interested in implementing this into Azure. From my reading so far, the only areas I need to modify for Feast to work correctly is:

Can someone confirm that this train of thought is correct?

if so, can someone point me in the right direction on where/how to modify the code to accomodate this? ie - which classes need to be modified before I compile and create the docker image

Edit 1: after further reading, I'm a little more confused. The docker-compose.yml spins up a redis pod and a kafka + zookeeper pod. Is there no way to swap out redis and kakfa in k8s with cloud managed services? ie - configure kafka to use EventHub, configure redis to use Azure Redis

Edit 2: in the documentation there's a quote "Data is persisted in historical stores like BigQuery in log format. Repeated ingestions will duplicate the data is persisted in the store. Feast will automatically deduplicate data during retrieval, but it doesn't currently remove data from the stores themselves.". So that means Feast doesn't have "versioning" of data like HopsWorks? If that's the case, what needs to be done to get us there? And how is the data being deduplicated?

woop commented 4 years ago

if so, can someone point me in the right direction on where/how to modify the code to accomodate this?

Have you had a look at https://docs.feast.dev/contributing/adding-a-new-store

Can someone confirm that this train of thought is correct?

Yes, looks right.

If that's the case, what needs to be done to get us there? And how is the data being deduplicated?

Data is deduplicated during retrieval based on unique keys (entities and event time). It returns the latest ingested row (by processing time).

Is there no way to swap out redis and kakfa in k8s with cloud managed services?

Can be done. For Redis it should work. For EventHub you would need to extend ingestion to support that. I havent used their API.

So that means Feast doesn't have "versioning" of data like HopsWorks

This is currently not implemented. We support consistent ingestion into both store types, but versioning itself isn't supported yet.

Yanson commented 4 years ago

@kenny-bui-slalom we are working on bringing Feast to Azure by;

  1. Re-writing ingestion as a Spark structured stream job
  2. Running the job through the Databricks Jobs API
  3. Using ADLS/Delta for storage
  4. SparkSQL batch connector exporting to ADLS
  5. Update client to read/write ADLS

This is just a preview, we will have the full proposal in a couple of days.

We will also be using managed services, you can turn off deployment of Postgres/Kafka etc in the Helm chart.

/cc @algattik @woop @ches

SHARANTANGEDA commented 4 years ago

@kenny-bui-slalom we are working on bringing Feast to Azure by;

  1. Re-writing ingestion as a Spark structured stream job
  2. Running the job through the Databricks Jobs API
  3. Using ADLS/Delta for storage
  4. SparkSQL batch connector exporting to ADLS
  5. Update client to read/write ADLS

This is just a preview, we will have the full proposal in a couple of days.

We will also be using managed services, you can turn off deployment of Postgres/Kafka etc in the Helm chart.

/cc @algattik @woop @ches

Hey @Yanson , This is great work you guys are doing Can u please give approximate duration for the azure support? We have been waiting on this for a while Thanks

Yanson commented 4 years ago

Hi @SHARANTANGEDA,

We will have an internal build within 4-6 weeks and create a PR in that timeframe too. I can't comment on how long it will take to be accepted into the feast main branch.

You can check more details and discussion here: https://github.com/feast-dev/feast/issues/764

dr3s commented 4 years ago

@woop ok to close this?

woop commented 4 years ago

@dr3s I'm happy to keep this issue open since the discussion will still evolve, if that is what you were referring to?

dr3s commented 4 years ago

Just the opposite. I was suggesting we close it.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.