feast-dev / feast

The Open Source Feature Store for Machine Learning
https://feast.dev
Apache License 2.0
5.49k stars 979 forks source link

Databricks Spark runner #764

Closed algattik closed 3 years ago

algattik commented 4 years ago

Is your feature request related to a problem? Please describe. Running on non-GCP clouds is a common request (#367) but the lack of a fully managed Beam service comparable to Google Dataflow makes this hard. The Feast code is tied to GCP in some places.

Describe the solution you'd like Databricks is a popular managed service available on Azure and AWS. It offers fully managed and optimized Spark environments with a REST API on top. Databricks is the main committer on Apache Spark, and on Azure the service is directly offered and supported by Microsoft, making it available to all Azure customers without an extra contract needed. The Spark clusters run in the customer's environment as fully managed VMs. The customer only pays while clusters are running, storage at rest has very little cost.

The Databricks runtime includes the open-source Delta Lake storage layer which allows efficiently using cloud storage as a repository for historical serving.

We're starting work on a Databricks runner that we would like to submit as a PR. This issue is a place to discuss and align with the community upfront, to ensure this PR will be accepted.

Describe alternatives you've considered

Additional context

Baseline

We will baseline the work on the 0.5.0 release.

Ingestion - Feast Core

Ingestion - Spark job

Historical Retriever - API change

Historical Retriever - Feast Serving

Historical Retriever - Spark job

Historical Retriever - Python SDK

Secrets management

Integration testing

dr3s commented 4 years ago

It would be nice to know if Feast could target the existing Beam ingestion jobs to run using spark DStreams or if they use functionality that are incompatible. Using the Beam API is a big advantage IMHO. I'm not sure about the deficiencies in structured streaming but I hope that they get addressed and then it would be an easier transition.

woop commented 4 years ago

Super excited that you folks are picking this up. Adding this functionality would serve as a good base for non-GCP adoption of Feast, and loads of folks have already asked us for Azure/Databricks support.

I had a look at the proposal. Overall it makes sense. I think you have identified the right abstractions to implement the functionality.

A class (feast.core.job.databricks.DatabricksJobManager) implementing a Databricks job manager to create, monitor and control jobs.

I just want to call out development that will probably happen quite soon around managing job life cycles (#761 ). We are trying to allow ingestion jobs to be long running with the only updates being those forced by code level changes, not schema changes. This is different to the status quo where the JobCoordinator keeps submitting new jobs every time a feature set is registered, with the job being relatively static.

Technically that happens one level above the JobManager, but it affects the ingestion job itself.

Scala is usually the preferred programming language for concision, but Java can be used as well if required.

Personally I am quite ambivalent about the language choice here. Eventually we want this to be supported by the contributors and I think we have enough experience here to support Scala. I am more concerned about the tooling around the language than the language itself, but since you are suggesting Maven I don't think it will be a problem.

The Spark job will connect directly to Redis to populate data for online serving, using the Redis Spark connector.

Do you plan to still have one job running per read/write destination? Meaning a job does not write to two destinations at the same time.

Depending on the complexity of this, we might propose an initial version that is not yet fully featured (e.g. working only a single dataset).

Do you mean a single source table (feature set)? Normally we use the term dataset at retrieval to mean the resulting dataset, so it's not clear how this reduced scope implementation differs from the current functionality.

This will be replaced with Apache Libcloud to make the SDK compatible with Azure/AWS without adding complexity

I like this idea, but I am not sure about the maturity and support of Libcloud. Personally I haven't used it. With Python we can have conditional imports, so we could remove all cloud specific dependencies from the default installation and assume users have these dependencies already if they are on a certain cloud provider. My concern with taking the Libcloud approach is that users would probably need to install a new dependency, but the benefit is potential abstraction of cloud providers' object stores.

Other than that I found myself nodding along with the rest of the proposal, although I have only highlighted the things that stood out.

One more open question I have is how we would be able to run end-to-end tests without having Databricks or Azure infrastructure, or better yet, how much would this contribution need to be extended to run on a local Spark environment?

algattik commented 4 years ago

Thanks a lot @woop !

I just want to call out development that will probably happen quite soon around managing job life cycles (#761 ).

Since Databricks can only stop and start jobs, not update running ones, Is it safe to assume that that proposal will just result in jobs getting stopped and started, and will not change the DatabricksJobManager class?

Personally I am quite ambivalent about the language choice here. Eventually we want this to be supported by the contributors and I think we have enough experience here to support Scala. I am more concerned about the tooling around the language than the language itself, but since you are suggesting Maven I don't think it will be a problem.

Yes we will integrate into the Feast build, nothing fancy here.

The Spark job will connect directly to Redis to populate data for online serving, using the Redis Spark connector.

Do you plan to still have one job running per read/write destination? Meaning a job does not write to two destinations at the same time.

Yes, the logic will be the same. This may result in more clusters running than desirable (and ballooning costs) if we start having many jobs. Especially if these jobs process little data, this would be suboptimal. At that point, we could run multiple jobs per cluster rather than spinning a new cluster per job. That improvement can be built later on on the basis of the initial scope proposed here. (We didn't want to have that being the default, as it can easily lead to resource exhaustion)

Depending on the complexity of this, we might propose an initial version that is not yet fully featured (e.g. working only a single dataset).

Do you mean a single source table (feature set)? Normally we use the term dataset at retrieval to mean the resulting dataset, so it's not clear how this reduced scope implementation differs from the current functionality.

Sorry, corrected the text, I meant feature set.

This will be replaced with Apache Libcloud to make the SDK compatible with Azure/AWS without adding complexity

I like this idea, but I am not sure about the maturity and support of Libcloud. Personally I haven't used it. With Python we can have conditional imports, so we could remove all cloud specific dependencies from the default installation and assume users have these dependencies already if they are on a certain cloud provider. My concern with taking the Libcloud approach is that users would probably need to install a new dependency, but the benefit is potential abstraction of cloud providers' object stores.

This is the entirety of the code I found in the Python SDK, accessing GCP storage (though I might have missed some):

     # TODO: abstract away GCP depedency
        self.gcs_client = storage.Client(project=None)
[...]
       uris = self.get_avro_files(timeout_sec)
        for file_uri in uris:
            if file_uri.scheme == "gs":
                file_obj = tempfile.TemporaryFile()
                self.gcs_client.download_blob_to_file(file_uri.geturl(), file_obj)
[...]
    storage_client = storage.Client(project=None)
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(remote_path)
    blob.upload_from_filename(local_path)

We have a libcloud committer in my team, who confirmed that at least these very simple API operations will map seamlessly to libcloud. My team (Microsoft Commercial Software Engineering) have been using libcloud successfully at tier 1 industrial customers.

I think the main alternative to libcloud, since we probably don't want to pepper the code with if switches, would be to introduce "storage connector" modules in the Python SDK, similar to what is done in Feast Java. That would be very easy given the small API surface covered, but I realized that would essentially duplicate what libcloud does, hence the proposal.

One more open question I have is how we would be able to run end-to-end tests without having Databricks or Azure infrastructure, or better yet, how much would this contribution need to be extended to run on a local Spark environment?

I've added an Integration testing section in the proposal to cover this.

woop commented 4 years ago

Since Databricks can only stop and start jobs, not update running ones, Is it safe to assume that that proposal will just result in jobs getting stopped and started, and will not change the DatabricksJobManager class?

Yes. cmiiw @pyalex.

At that point, we could run multiple jobs per cluster rather than spinning a new cluster per job. That improvement can be built later on on the basis of the initial scope proposed here. (

Alright, makes sense.

Sorry, corrected the text, I meant feature set.

Retrieval from a single feature set would be a pretty severe limitation I think. Most of our users retrieve data from multiple feature sets, so it's something I would consider as MVP functionality. That being said we would still welcome the contribution, but I would strongly recommend joins from multiple feature sets.

I think the main alternative to libcloud, since we probably don't want to pepper the code with if switches, would be to introduce "storage connector" modules in the Python SDK, similar to what is done in Feast Java. That would be very easy given the small API surface covered, but I realized that would essentially duplicate what libcloud does, hence the proposal.

To me the question is more on what we give up. Will Libcloud seamlessly pull the existing service accounts and environmental variables that the GCloud client pulls in when running in a GCP environment? Is it wrapping the Gcloud SDK or is it built from scratch?

I've added an Integration testing section in the proposal to cover this.

Thanks for the context, an emulator would be great. Would I be correct in saying that Apache Livy or Spark Job Server could be open source alternatives that we could extend towards in the future?

dr3s commented 4 years ago

FYI, the AWS support we are working on is taking an incremental approach to client storage protocol handling https://docs.google.com/document/d/14ouhFlFiw2OXW5m_esoW0fI9iR7N0TfMd_szi7O2aCk/edit#

We could always use libcloud for the implementation in the future but didn't see much immediate value and didn't want to introduce a large dependency that could conflict with other client dependencies. The API surface is minimal right now and I like retaining the option to optimize for a specific cloud provider without being dependent upon an upstream project that may have more generic needs.

dr3s commented 4 years ago

We are also a big user of databricks but it would be nice to detail why one of Beam's sparkrunners aren't suitable for the existing jobs. I haven't verified the Beam compatibility against the jobs in Feast but I'm curious if anyone has?

dr3s commented 4 years ago

FYI https://github.com/feast-dev/feast/pull/769 adds support for S3. Adding Azure storage implementation would be trivial.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

woop commented 3 years ago

Closing for now. The submitted code has been merged in through #1076, but hasn't been released yet.