CamDavidsonPilon / lifetimes

Lifetime value in Python
MIT License
1.45k stars 375 forks source link

How to run bgf.fit in pyspark? #127

Closed MingCong18 closed 2 months ago

MingCong18 commented 7 years ago

How to run bgf.fit function in pyspark? Do I have to convert spark dataframe to pandas? Thanks

stochastic1 commented 7 years ago

Note - I would call this a port request rather than issue.

The short - I'd love to hear if any other collaborators are working on this. We can only implement by converting the Pyspark Dataframe RDD to Pandas Dataframe. I'd like to help with implementing in PySpark if there's interest.

The long -

My work project team is trying to implement the BetaGeoFitter class in PySpark and that's exactly as far as we've gotten - convert the pyspark dataframe to pandas dataframe and run the methods on the pandas dataframe. This means we're single-threading each iteration because we've not been able to get the numpy methods underlying the pandas methods to parallelize.

We have tried a UDF to import lifetimes and requirements to each node when we call the method, we have tried recoding portions of the BetaGeoFitter class to be acceptable to PySpark syntax, but it ultimately comes back to methods and functions that expect an ndarray or pandas series but instead get a PySpark column object. We even took a barebones approach and focused on the fitting algorithm, but the two required functions - log() and gammaln() - are problematic in PySpark. Log() in PySpark expects a float, gammaln() is not currently defined for PySpark but rather exists in Scala and lacks a JVM wrapper to be comprehensible to the PySpark interpreter.

I know of a related project - sparkit-learn - that has some promise of porting these methods, however their project is not currently on our distro (Anaconda) and I'm not sure the extent to which their latest release will work with our PySpark instance.

aprotopopov commented 7 years ago

We also convert a pyspark dataframe to pandas and then fit it with lifetimes. For predicting users probability alive we are using Spark UDF. Not sure about good minimization algorithms realizations (like scipy.optimize.minimize).

CamDavidsonPilon commented 7 years ago

We also convert a pyspark dataframe to pandas and then fit it with lifetimes.

Similar to what we do at Shopify - but we do it for all shops. So keyBy shop_id, groupByKey, convert the resulting lists to a DataFrames, then apply lifetimes.

@stochastic1 why is subsampling not an option? I figure after tens of thousands of instances, estimation becomes quite precise.

stochastic1 commented 7 years ago

@CamDavidsonPilon , do you combine all models for each store or model each store separately? Do you find that conversion from PySpark lists is faster than conversion from PySpark dataframes? That's the bottleneck we'd like to eliminate, we find that the DF to DF conversion takes about as long for 1MM customers as for 10MM customers, and eliminating it would remove about an hour of processing time and a lot of memory consumption from the edge node.

Actually, one of our DBAs had suggested a representative subsample of customers because once the sample approaches ~45% of the population, the BGF fit parameters stabilize and are sufficiently-close to the parameters fit on the entire population that they're useful. This size still takes a little over an hour to fit, but it's more feasible than fitting the entire population. We'll look into this.

CamDavidsonPilon commented 7 years ago

Model each store separately. Actually I recently changed the code to pyspark DF + a python UDF (to calculate model per shop), and this seemed to work well. We can process all 500k shops (about 500m customers) in ~1 hour, using large resources too however.

The DF -> DF taking an hour is surprising! I don't think PySpark RDDs would be any faster than PySpark Dataframes when converting to Pandas - both need to do a Java -> Python dump and then a collect to the driver. Converting it into summary data (recency, frequency, T) makes sense to do in Spark land before collecting.

stochastic1 commented 7 years ago

I appreciate seeing how you do it. We convert to summary in SQL from Hive, which is quite fast, and we've managed considerable time savings on the conversion to pandas by loading from Parquet, whose compression allows us to work on a much leaner footprint. Still, the fitting has to be done entirely in Pandas, so we're going to eye a PySpark solution just so we can leverage our hardware better.

FWIW, our business model doesn't give us a clean store-based model as a natural subset, but there may be a different level of natural aggregation we can use.

lseffer commented 6 years ago

Bumping this since spark 2.3 brought arrow supported pandas udfs. Those should be applicable here. Haven't tried them myself yet but it would be interesting to hear if you've had any performance gains since the last message exchanges.

CamDavidsonPilon commented 6 years ago

ooo not on 2.3, but will report back if we make the change.

sophia-wright-blue commented 5 years ago

did you get a chance to use lifetimes with pandas udfs @CamDavidsonPilon ? would greatly appreciate any guidance on running the summary_data_from_transaction_data function on a spark dataframe. thanks,

CamDavidsonPilon commented 5 years ago

I did manage to run the fit with PySpark's Pandas UDFs, and saw a 2x performance increase.

I think the overall goal of summary_data_from_transaction_data can be replicated in a Spark Dataframe - the output table is just a bunch of aggregations of the transaction data. So my advice: don't use summary_data_from_transaction_data in Spark, use Spark's native aggregators.

sophia-wright-blue commented 5 years ago

thanks for the quick reply @CamDavidsonPilon ; I'll try using Spark's native aggregators for summary_data_from_transaction_data.

I am curious about an error that I'm getting though. When I run summary_data_from_transaction_data in Spark using pandas_udf I get the following error:

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

any idea what might be causing this error?

I apologize if the question is too ambiguous. thanks,

CamDavidsonPilon commented 5 years ago

Without seeing more of the error & code, it's likely that one of your inputs into the function is of the wrong type. Ex: observation_period_end is not a string, or monetary_value_col is not a string, etc. I would check there first.

sophia-wright-blue commented 5 years ago

you're right! I did not realize that the monetary_value_col had to be a string! thanks so much for your help, since I have your attention, could I get some brief guidance on https://github.com/CamDavidsonPilon/lifetimes/issues/242 from you?

It'd help me out greatly, thanks so much!

sophia-wright-blue commented 5 years ago

Hi @CamDavidsonPilon , would it be possible to share the code that you used to run the fit with PySpark's Pandas UDFs? It'd help out greatly, I'm unclear on the schema definition of the output from the pandas_udf, how do you return lifetimes.BetaGeoFitter , thanks,

CamDavidsonPilon commented 5 years ago

Ah, it was at my last job, I don't have access to it anymore - sorry.