Closed 8bit-pixies closed 4 years ago
@chappers thanks for the request. this is something on our to do list.
Is there any quick hacky way to use featuretools with spark?
i too request a spark implementation, preferably with java api also
@ajing we've had success integrating with pyspark. the basic approach would be to partition your data using spark dataframes and then map each partition to an entityset and calculate_feature_matrix(..)
call. Then you can write out a feature matrix for each partition.
there is a basic outline of this approach using Dask in our Predict Next Purchase repo. See the end of the notebook here: https://github.com/Featuretools/predict-next-purchase/blob/master/Tutorial.ipynb
A more in depth tutorial on this will be coming soon too.
I need that too, thanks
For applying learned features to a full data set, you can use Pandas UDFs which call calculate_feature_matrix(). With this approach, you group by a key that distributes your spark data frame across nodes and passes portions of the dataframe as pandas to a user-defined function before recombining the result as a spark dataframe.
Are there any plans on making dfs() work in a distributed way? Right now I'm using toPandas() on my spark data frame to do feature generation on the driver node.
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
@bgweber thanks for outlining the approaching with Pandas UDFS
yep, there are plans for a better integration. stay tuned!
There is now a Spark example notebook showing how to distribute feature engineering across a cluster of machines: Featuretools on Spark notebook. A write-up describing the approach is also available on the Feature Labs engineering blog: Featuretools on Spark article.
Hopefully this is helpful, and if anyone needs more clarification, we'd be glad to provide it!
@WillKoehrsen It certainly looks promising as a first pass. There does seem to be a fair amount of upfront work involved to ensure that each partition has the "full data" that is required to build the feature matrix. This might be tricky if the entity set has complex relationships (i.e. getting them partitioned in the first place might be a massive pain).
Happy to close this as "done" for now - might be worth testing it out there on our internal large datasets first and revisit if we have any questions.
@chappers You're right, the example there is relatively simple to partition. With a more complex EntitySet
(more parent-child relationships) this could be tough to implement.
If you do test out this approach, we'd enjoy hearing about it.
Here's a gist of the approach I'm taking, the goal is to be able to deal directly with Spark data frames: https://gist.github.com/bgweber/6655508db34dffe7a63cfb95281fa700
There's a few things kept in memory on the driver node. I'm hoping I can share a write up I did on this approach eventually, because it's a great way of mixing python code that relies on Pandas with PySpark.
@bgweber this looks like a great approach! please let us know how it works.
For the purpose of discussion, heres a version from @bgweber using the tutorial version on feature tools:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
import featuretools as ft
spark = (SparkSession\
.builder\
.appName("myname")\
.getOrCreate())
data = ft.demo.load_mock_customer()
customer_df = data['customers']
es = ft.EntitySet('name', {'customers': (customer_df, 'customer_id')})
output_df, feats = ft.dfs(entityset=es, target_entity='customers', max_depth=2)
# now we can build this on spark...
customer_sp = spark.createDataFrame(customer_df)
target_schema = spark.createDataFrame(output_df.reset_index()).schema
#ft.calculate_feature_matrix(feats, es)
@pandas_udf(target_schema, PandasUDFType.GROUPED_MAP)
def generate_features(df_sp):
es = ft.EntitySet('name', {'customers': (df_sp, 'customer_id')})
return ft.calculate_feature_matrix(feats, es).reset_index()
customer_ft = customer_sp.groupby("customer_id").apply(generate_features)
customer_ft.toPandas()
Comments: this approach will probably work well if you have a single dataset. We probably want to be able to do something across the whole entityset object for everything (as per the mock data).
In my eyes, the value that Featuretools bring is that the objects you pass are relational objects, having to denormalise first somewhat diminishes the value of featuretools in the first place. I think if we could have Spark working over entitysets that would be really cool (not sure if that means a change in the spark internals etc).
Of course this is a massive step in the right direction, and would be keen on how other people use and scale featuretools
I love this thread. How much the actual feature tools depend on pandas? I'm happy to help and the work of @chappers and @bgweber it's an amazing step indeed
@FavioVazquez right now, the internals depend on the Pandas DataFrame a little too tightly. There are also a few places where we go down to pure python code, which isn't amenable to a spark dataframe integration.
If you'd like to contribute, it probably makes sense for you review the EntitySet
class and the calculate_feature_matrix
method, to see how hard it'd be to adjust them to support both pandas dataframes and spark dataframes.
Then we can make a plan to refactor the internals towards the goal of supporting dataframes from other libraries such as spark or dask.
@kmax12 would it be possible basically to implement a "SparkBackend" and reference it in calculate_feature_matrix() here:
and then update EntitySet to use an abstract dataframe too (like it could be pandas dataframe or Spark dataframe ) ?
It seems first step could be some refactoring to use an abstract dataframe, with implementation for pandas dataframe so then we can add later Spark dataframe implementation too.
Yep, that's the approach we had in mind. Although, much like we'd have a spark backend, we'd have a SparkEntitySet.
I'd also like to refactor functionality that can be shared across backends in to a BaseBackend
class. For example, we have logic that decides the order in which to calculate features to avoid repeating calculations. Then each backend could subclass BaseBackend
and implement the relevant functions.
That makes sense.
Thank you @kmax12
Is this something that is planned to be part of a particular Featuretools release ?
we're working on it, but no ETA for putting into Featuretools open source.
@kmax12 @WillKoehrsen @chappers Hi All,
Can you please provide code to partition data using spark and any link for full implementation of featuretool on spark ?We are thinking to use featuretool for our fraud prediction system so just wan t to do small poc on spark implementation..
https://medium.com/feature-labs-engineering/featuretools-on-spark-e5aa67eaf807
following this link but partition data code is not work.
for partition, grouped in members.groupby('partition'):
# Open file for appending
with open(file_dir + f'p{partition}/members.csv', 'a') as f:
# Write a new line and then the contents of the dataframe
f.write('\n')
grouped.to_csv(f, header = False, index = False)
Thanks Sarvendra
@chappers hello, I see your code about using the pyspark pandas_udf and featuretools to calculate the feature matrix ! good work! but I want to ask you why you first used the ft.dfs to calculate the matrix, you should have add the arguments features_only=True, but you didn't set it, so in fact ,you calculate the matrxi two times. My question is how to get the schema from the features if I set the features_only =True.
@lvjiujin I'm not a featuretools developer - best to ask the team.
Because featuretools is currently based on pandas, maybe it would be easier to add support for Koalas (https://github.com/databricks/koalas) which is pandas API on Spark. Maybe with minimal changes we could add support for Koalas DataFrames, which are a pandas-like interface for Spark DataFrames.
@candalfigomoro would be interesting to see if we can do this after we do #783
@kmax12 Yes, I think some of the changes performed to add Dask DataFrames compatibility could also be valid for Koalas DataFrames.
Closing in favor of #887
Bug/Feature Request Description
In notebooks such as here: https://github.com/Featuretools/predict-next-purchase/blob/master/Tutorial.ipynb and documentation: https://docs.featuretools.com/usage_tips/scaling.html
It mentions the ability to scale to Spark. Could an example be provided like it was for dask here: https://github.com/Featuretools/predict-next-purchase?
Issues created here on Github are for bugs or feature requests. For usage questions and questions about errors, please ask on Stack Overflow with the featuretools tag. Check the documentation for further guidance on where to ask your question.