columbia-applied-data-science / rosetta

Tools, wrappers, etc... for data science with a concentration on text processing
Other
206 stars 47 forks source link

MySQLStreamer data cache does not work with n_jobs>1 #34

Open dkrasner opened 10 years ago

dkrasner commented 10 years ago

when running

streamer = MySQLStreamer(db_info, tokenizer_func=token_func)
cache_list = ['doc_id']
streamer.to_vw(open(vw_outfile, 'w'), n_jobs=n_jobs, cache_list=cache_list) 

if n_jobs > 1 streamer.doc_id_cache remains empty

ApproximateIdentity commented 8 years ago

I dealt with this issue today when using TextIterStreamer. I spent some time searching for the bugs in my code and once I tracked the problem down, I realized it was the same as this issue so I figured I would add in some info.

I don't know a very easy way to solve this while fitting in with the overall design, but I can point out the exact problem. The issue lies with parallel_apply. See here:

https://github.com/columbia-applied-data-science/rosetta/blob/master/rosetta/text/streamers.py#L112

and how it uses the passed in func which is defined here:

https://github.com/columbia-applied-data-science/rosetta/blob/master/rosetta/text/streamers.py#L109

If we look at _to_sstr() we see that the location where to cache_list is updated is here:

https://github.com/columbia-applied-data-science/rosetta/blob/master/rosetta/text/streamers.py#L539

Now if we look at parallel_easy, we see that func comes into play here:

https://github.com/columbia-applied-data-science/rosetta/blob/master/rosetta/parallel/parallel_easy.py#L80

So here is what's actually going on. For each job, func is passed down to the subprocess which includes a copy of the streamer. So actually that copy is getting a cache_list. But the problem is that that cache list is never being communicated back to the main process. The text results are being communicated out by way of the queues, but not not the cache_lists.

Now it's actually a little unclear to me how this can be fixed. For one, due to the fact that the cache_list by default is not caching corresponding doc_ids, you wouldn't know where they came from in the end. Even if this were added, it would have to be a kind of hack to reattach them to the main processes' streamer in the end. It could be done, but it would certainly be a little confusing and would muddy up parallel_easy which has the advantage of being a pretty clean parallelization implementation.

I think think the right solution is probably to simply mention in the doc string that the cache_list just doesn't work for parallel jobs.

On a side note, the reason it works for n_jobs = 1, is that parallel_easy doesn't create a subprocess in that case:

https://github.com/columbia-applied-data-science/rosetta/blob/master/rosetta/parallel/parallel_easy.py#L63

ApproximateIdentity commented 8 years ago

The easiest way to solve this if we wanted to keep the cache_list, is to change it to a cache_dict and store the {doc_id, val} pairs. Then this could be passed back after everything is finished and a final dict merge could be done and attached to the main streamer. There may be hidden issues, but that is probably the sanest method short of simply putting a warning in the doc string saying that the n_jobs breaks the cache_list.