sparklyr / sparklyr

R interface for Apache Spark
https://spark.rstudio.com/
Apache License 2.0
948 stars 308 forks source link

Support for ml_als with recommendations and pipelines #1578

Closed javierluraschi closed 6 years ago

javierluraschi commented 6 years ago
library(sparklyr)
sc <- spark_connect(master = "local")

products <- data.frame(
  UserID    = c("Bob", "Charlie", "Alice", "Bob", "Charlie", "Alice"),
  ProductID = c("Books", "Books", "Books", "Candy", "Candy", "Apples"),
  Score     = c(3, 1, 2, 4, 5, 4)
)
reviews_tbl <- sdf_copy_to(sc, products)

pipeline <- ml_pipeline(sc) %>%
  ft_string_indexer(input_col="ProductID", output_col="product_index") %>%
  ft_string_indexer(input_col="UserID", output_col="user_index") %>%
  ml_als(rating_col="Score", user_col="user_index", item_col="product_index", max_iter=10)

fitted_pipeline <- ml_fit(pipeline, reviews_tbl)
ml_predict(fitted_pipeline, reviews_tbl)
# Source:   table<sparklyr_tmp_bd4e5ef76649> [?? x 6]
# Database: spark_connection
  UserID  ProductID Score product_index user_index prediction
  <chr>   <chr>     <dbl>         <dbl>      <dbl>      <dbl>
1 Charlie Candy         5             1          1       4.86
2 Bob     Candy         4             1          2       3.97
3 Alice   Apples        4             2          0       3.89
4 Charlie Books         1             0          1       1.08
5 Bob     Books         3             0          2       2.80
6 Alice   Books         2             0          0       2.00

So far so good, however:

ml_recommend(pipeline, "items", 1)
 Error in eval(lhs, parent, parent) : attempt to apply non-function 
4. eval(lhs, parent, parent) 
3. eval(lhs, parent, parent) 
2. (switch(type, items = model$recommend_for_all_users, users = model$recommend_for_all_items))(n) %>% 
    mutate(recommendations = explode(!!as.name("recommendations"))) %>% 
    sdf_separate_column("recommendations") at ml_recommendation_als.R#265
1. ml_recommend(pipeline, "items", 1) 
kevinykuo commented 6 years ago

ml_recommend() takes a ALS model object, not a pipeline model, so you'd have to extract the appropriate stage from the pipeline if you want to use ml_recommend():

fitted_pipeline %>%
  ml_stage("als") %>%
  ml_recommend()
# Source:   table<sparklyr_tmp_cedb5c8c865e> [?? x 4]
# Database: spark_connection
# user_index recommendations product_index rating
# <int> <list>                  <int>  <dbl>
# 1          1 <list [2]>                  1   4.86
# 2          2 <list [2]>                  1   3.97
# 3          0 <list [2]>                  2   3.89

We can do better with that error message, though!

campanell commented 6 years ago

Thanks for the clarification on ml_recommend and the ml_als pipeline example. It's helpful to someone like me who is inexperienced with Spark ML pipelines.

campanell commented 6 years ago

I am not able to get ml_fit to execute. My pipeline looks like this:

pipeline <- ml_pipeline(sc) %>%
            ft_string_indexer(input_col="ProductID",output_col="product_index") %>%
            ft_string_indexer(input_col="UserID",output_col="user_index") %>%
            ml_als(rating_col="Score",user_col="user_index",item_col="product_index",max_iter=10) 
pipeline
fitted_pipeline <- ml_fit(pipeline,reviews)

My data looks like this:

Source: lazy query [?? x 4] Database: spark_connection Id ProductId UserId Score

1 B001E4KFG0 A3SGXH7AUHU8GW 5 2 B00813GRG4 A1D87F6ZCVE5NK 1 3 B000LQOCH0 ABXLMWJIXXAIN 4 4 B000UA0QIQ A395BORC6FGVXV 2 5 B006K2ZZ7K A1UQRSCLF8GW1T 5 6 B006K2ZZ7K ADT0SRK1MGOEU 4 The ml_fit is spitting out this mess: Error: java.lang.IllegalArgumentException: Field "ProductID" does not exist. at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267) at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at org.apache.spark.sql.types.StructType.apply(StructType.scala:266) at org.apache.spark.ml.feature.StringIndexerBase$class.validateAndTransformSchema(StringIndexer.scala:85) at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:109) at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:152) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186) at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:184) at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74) at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:136) at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:96) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sparklyr.Invoke.invoke(invoke.scala:137) at sparklyr.StreamHandler.handleMethodCall(stream.scala:123) at sparklyr.StreamHandler.read(stream.scala:66) at sparklyr.BackendHandler.channelRead0(handler.scala:51) at sparklyr.BackendHandler.channelRead0(handler.scala:4) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) Traceback: 1. ml_fit(pipeline, reviews) 2. spark_jobj(x) %>% invoke("fit", spark_dataframe(dataset)) %>% . ml_constructor_dispatch() 3. withVisible(eval(quote(`_fseq`(`_lhs`)), env, env)) 4. eval(quote(`_fseq`(`_lhs`)), env, env) 5. eval(quote(`_fseq`(`_lhs`)), env, env) 6. `_fseq`(`_lhs`) 7. freduce(value, `_function_list`) 8. function_list[[i]](value) 9. invoke(., "fit", spark_dataframe(dataset)) 10. invoke.shell_jobj(., "fit", spark_dataframe(dataset)) 11. invoke_method(spark_connection(jobj), FALSE, jobj, method, ...) 12. invoke_method.spark_shell_connection(spark_connection(jobj), . FALSE, jobj, method, ...) 13. core_invoke_method(sc, static, object, method, ...) 14. withr::with_options(list(warning.length = 8000), { . if (nzchar(msg)) { . core_handle_known_errors(sc, msg) . stop(msg, call. = FALSE) . } . else { . msg <- core_read_spark_log_error(sc) . stop(msg, call. = FALSE) . } . }) 15. force(code) 16. stop(msg, call. = FALSE) I am getting the same error on both AWS and IBM Data Platform cloud services.
kevinykuo commented 6 years ago

@campanell check the spelling of your column names e.g. ProductId vs ProductID

javierluraschi commented 6 years ago

@kevinykuo good to know! I wonder if you would want to consider having an S3 method in ml_recommend that does take a pipeline and automatically extracts the stage; that said, I'm not sure this will be desirable for all ml_recommend operations. Otherwise, looks like we can close this one.

kevinykuo commented 6 years ago

I'd like to stay away from generalizing specialized helper functions to pipeline objects, because it would require inspection of the pipeline and making assumptions on which stage to extract (e.g. the ALS routine could be in any position in the pipeline and there could be more than one), which in turn could lead to unexpected behavior.

Closing this, but @campanell feel free to let me know if you run into further issues!

campanell commented 6 years ago

Thanks for the instructions. So embarrassed about the typos. I should not be afraid of the Boogeyman (ie Scala error messages), but I still am.

I was able to get to ml_recommend. However, I would like to know where in the pipeline, do I use ft_index_to_string to covert the product_index and user_index back to ProductId and UserId?

campanell commented 6 years ago

Was able to get the values from ft_index_to string back to the recommend data frame.

prod_index <- fitted_pipeline %>%
              ml_stage(1) %>%
              ml_labels()

user_index <- fitted_pipeline %>%
              ml_stage(2) %>%
              ml_labels()
recommend_1 <- ft_index_to_string(recommend,input_col="product_index",output_col="ProductId_" , 
                                      labels = prod_index)

recommend_2 <- ft_index_to_string(recommend_1,input_col="user_index",output_col="UserId_" , 
                                      labels = user_index)

head(recommend_2)

Thanks so much for all your help. I am now able to do collaborative filtering with sparklyr.