In order to be able to retrain models offline in Spark, the storage backends need to support some additional functionality. In the initial design of Velox, we assumed that Spark would be able to read all training data directly from the storage backend. However, none of the storage backends currently in use support this with Spark 1.3. Instead, when an offline retrain is triggered, the storage backend needs to write all the training data to HDFS. Currently, this takes the form of dumping all training data to a CSV format that Spark can parse, and then having the retrain process write the CSV string to HDFS.
Also, when the models are retrained offline, we need to bulk insert all of the newly retrained models back into the storage backend. If the storage backend supports an efficient bulk-insert process, that can be used. Otherwise the default is to call put() sequentially for each item.
In summary, the additional methods needed are:
dumpToCSV() to return a CSV string with format userid, item, score
bulkInsert(kvs: List[(K, V)]) to insert all the newly retrained models at once
And these methods need to be implemented for each storage backend we support.
In order to be able to retrain models offline in Spark, the storage backends need to support some additional functionality. In the initial design of Velox, we assumed that Spark would be able to read all training data directly from the storage backend. However, none of the storage backends currently in use support this with Spark 1.3. Instead, when an offline retrain is triggered, the storage backend needs to write all the training data to HDFS. Currently, this takes the form of dumping all training data to a CSV format that Spark can parse, and then having the retrain process write the CSV string to HDFS.
Also, when the models are retrained offline, we need to bulk insert all of the newly retrained models back into the storage backend. If the storage backend supports an efficient bulk-insert process, that can be used. Otherwise the default is to call
put()
sequentially for each item.In summary, the additional methods needed are:
dumpToCSV()
to return a CSV string with formatuserid, item, score
bulkInsert(kvs: List[(K, V)])
to insert all the newly retrained models at onceAnd these methods need to be implemented for each storage backend we support.