Open christaina opened 6 years ago
Hi Christaina,
As I recall correctly, the thing with Spark Iterators is that they use a generator to iterate through the data, so in a sense they do not keep all the data in memory, as the docs say, it may require significant auxiliary storage. Meaning, when you call tee
, you effectively copy the state of the generator, and not the data.
Furthermore, the argument you mentioned regarding the fact that a worker process will always train on the same partition is correct. However, consider the following; we are training a centralized model in an asynchronous optimization process. Thus other workers will also commit gradients based on other mini-batches to this centralized model. Furthermore, a worker always pulls the parameterization from the PS whenever it starts computing the next update to commit. As a result, it is not an issue :)
To conclude, the reason why unionAll
is not really fit is that it requires Spark to keep all the union data in memory, or serialized to disk, or it has to be recomputed from the DAG. Which all are expensive. Using tee
this way can minimize that overhead.
I hope this answers your question. Feel free to reopen.
Kind regards,
Joeri
Hey Joeri,
Thanks for your response. I agree that everything is okay when the number of partitions on the dataset is equal to the number of workers. I was thinking of the case where the variable parallelism_factor
is greater than one. Then, the number of workers is fixed, and the number of partitions on the data is equal to parallelism_factor * num_workers
. What I understand happens is that num_workers
partitions are being processed at once, and the data in the other partitions will not be seen until the training process finishes on those that are currently occupying the workers. Thus, training examples in the first num_workers
partitions are being trained on num_epoch
times before the other partitions of the data have been used at all.
I don't know if that actually matters at all for the performance, but it is pretty different than how epochs normally work. Let me know if I'm just missing out on anything. Thanks.
Christina
Hi Christina,
Ohhhhh, yes. I see now. You are right. This needs to be addressed. I'll check in with the Expedia guys because I think they are unaware of this.
Ping @CY-dev @jwang12345
Sadly I don't have access to a Spark cluster at the moment, else I'll do it myself. But I can do some tests locally to see if we can come up with a better method.
Thanks again, I totally forgot about this. But it's important, especially in the case in an increased parallelism / large dataset setting.
Joeri
Cool. I've been using this on a Spark cluster so let me know if you have any ideas you'd like to test out/debug. Would be happy to contribute. Likewise, I'll let you know if I find a good solution. Right now I've just been using the unionAll
way without cacheing the whole thing and it does the job without being crazy slow, tho I'd love to find a way that's even more efficient.
Hi Christina,
Thanks, let me know if you come up with something we can discuss it in this thread. I'll ask other people for their opinion. However, I the Expedia guys are right if they say that the unionAll takes ages (the datasets I tested weren't that big).
Joeri
I see that the method for managing epochs was changed to copy the iterator of the data
num_epoch
times usingtee
. The documentation for tee sayswhich makes it seem like this method is not very efficient. also, if one wants to use a
parallelism_factor
> 1, then this way of managing the epochs would result in re-training on data in one partitionnum_epoch
times before seeing any new data. Wouldn't it be better to keep the old way usingunionAll
(and cache dataset before you copy it? not so sure about this part...)