Closed eisber closed 4 years ago
Hi @eisber, this sounds very close to what we're trying to do (run MLeap in a streaming Apache Flink job, processing row by row). What do you think is the bottleneck in your case ?
Also, what model are you using? We found some performance issues using xgboost, see issue https://github.com/combust/mleap/issues/631
Initially our code was substantially faster as we didn't call transform() for every row, which has to perform some initial setup (e.g. inspection of input types, output dataframe allocation,...).
@voganrc just to confirm that I read your table correctly. To get the ML examples processed by second I need to multiply the cell with the batch size, right? If so it's surprising that only xgboost4j sees a speed-up (e.g. 10*10.135).
I was considering to implement batching in our integration, but the numbers above don't look promising. Also the complexity would increase...
@eisber I will take a closer look this week, but it seems to me like this would be a good use for the RowTransformer that mleap has, you can see more details about how to use it here https://github.com/combust/mleap/blob/master/mleap-runtime/src/test/scala/ml/combust/mleap/runtime/frame/RowTransformerSpec.scala#L52. You'd be able to provide the schema upfront, from your bundled zip file, and thus skip the cost of creating the leapframe.
@ancasarb thanks for the awesome hint. I changed our code: https://github.com/microsoft/masc/blob/marcozo/rowtransformer/connector/iterator/src/main/java/com/microsoft/accumulo/spark/processors/AvroRowMLeap.java#L299 and got a 40% speed improvement.
Any other hints like that?
Markus
@eisber Yes, you are reading my table correctly. xgboost4j was the only library I found that actually implemented batching. This PR is open though (https://github.com/combust/mleap/pull/600).
@eisber the other thing I could think of is that mleap has support for avro https://mvnrepository.com/artifact/ml.combust.mleap/mleap-avro, perhaps you could be using the DefaultRowReader/DefaultRowWriter there so that you simplify your code somewhat.
@eisber I am going to close this for now, but please feel free to re-open if you have further questions. Thank you!
Hi,
We are using MLeap to perform model inference within Apache Accumulo. Since the Accumulo iterator framework exposes a streaming API (e.g. process row by row vs batch) we'd like to re-use as much of the objects required by MLeap.
We managed to create single "infinite" input dataframe and then produce a single result data frame from which we pull the data iteratively. It works, but unnfortunately this results in a memory leak. I was looking through the stack, but wasn't able to figure out at which point things are kept in memory.
The code works, but isn't performing that well as we have to call transformer.transform(this.mleapDataFrame) for every single row.
Integration code can be found here: https://github.com/microsoft/masc/blob/master/connector/iterator/src/main/java/com/microsoft/accumulo/spark/processors/AvroRowMLeap.java#L337
Any advise appreciated.
Markus