vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.26k stars 590 forks source link

Unable to use the filtering when forking multiple processes #596

Open anubhav-nd opened 4 years ago

anubhav-nd commented 4 years ago

Hi,

The problem I am facing is quite strange (atleast to me). I am spawning child processes (which are doing vx.open and then filtering on a dataframe) from a single parent process. Now, the first child is able to complete the processing successfully but subsequent child processes can only open the dataframe (which is a separate copy of the original arrow file) but are unable to filter on it. I am also unable to catch any exception. The child processes just hang. when i call some property on resulting dataframe after filtering, say df.length().

I am in dire need of some help. Appreciate any feedback.

The minimum reproducible code should be as follows:

worker.py

import vaex as vx
import traceback
import shutil
import uuid
import time

def filter_vaex_df(input_df, search_query):

        output_df = eval('input_df.filter(%s)' % search_query)
        print('I am in actual filter')
        try:
            print('Length after filtering %d' % output_df.length()) # Every worker after 1st gets stuck here
        except Exception as e:
            print('I am in exception')
            print(e)
    return output_df

def save_filtered_dataset(parent_filename, filter_query):
    temp_uuid = str(uuid.uuid4())
    shutil.copy(parent_filename, '%s.arrow' % temp_uuid)
    try:
        time.sleep(5) # Giving some time for making sure io in above step has completed
        objects_df = vx.open( '%s.arrow' % temp_uuid)
        print('The length of objects df is %d' % objects_df.length())
        filtered_objects_df = filter_vaex_df(objects_df, filter_query)
        objects_df.close_files()
        print('The length of filtered_objects_df after filtering %d' % filtered_objects_df.length())
    except Exception as e:
        traceback.print_exc()
        raise e

main.py

import vaex as vx
from multiprocessing import Pool
import worker
import time

def spawn_children(parent_filename, filter_query):
   pool = Pool(processes=1)
   input_data = vx.open(parent_filename)
   pool.apply_async(worker.save_filtered_dataset, args=(parent_filename, filter_query,))
   print('Input length: %d' % input_data.length())

if __name__ == '__main__':
    file_name = <specify your input file address> # Let me know if I need to share my files
    search_query = <specify your query> # Let me know if this is needed as well
    spawn_children(file_name, search_query)
    time.sleep(10)
    spawn_children(file_name, search_query)
anubhav-nd commented 4 years ago

Hi @maartenbreddels ,

Update: The minimum reproducible thing above is not working. The process is not getting hung.

Actually, in the original code the spawning is happening from inside a dash callback. Can the process getting stuck be because of that?

Apologies for not reporting the correct thing in the first step.

maartenbreddels commented 4 years ago

Hi,

really appreciate you giving a reproducible example, but indeed, it seems to work here. Do you happen to open the dataframe from the worker and pass it down? Why do you use forking btw? Vaex releases the GIL a lot, so you may be able to get away with not doing that, possible use asyncio.

cheers,

Maarten

anubhav-nd commented 4 years ago

Yes, the workers will open the dataframe and pass it down. I used forking because the processing in workers is a bit intensive and I don't want to block the parent (which is a dash callback updating the front-end in this case). Let me see if I can do away with asyncio.

If not, what should I be doing to investigate this blocking. The block happens when I read the filtered results.

maartenbreddels commented 4 years ago

Apart from this possibly being a real issue, have you considered using a thread, although I can imagine that also being an issue. In vaex' server we have similar issues, where we use a threadpool with 4 threads that handles each request.

anubhav-nd commented 4 years ago

I have not really explored the thread option. Could you point me to the code where you are possibly handling similar issues in vaex server?

maartenbreddels commented 4 years ago

This is not well documented, it's all internal, but you can get some idea from it: https://github.com/vaexio/vaex/blob/54d5d30e28360cbc0d35de9cba0c09c3c6ab016e/packages/vaex-server/vaex/server/tornado_server.py#L111 https://github.com/vaexio/vaex/blob/54d5d30e28360cbc0d35de9cba0c09c3c6ab016e/packages/vaex-server/vaex/server/tornado_server.py#L176 https://github.com/vaexio/vaex/blob/54d5d30e28360cbc0d35de9cba0c09c3c6ab016e/packages/vaex-server/vaex/server/service.py#L60

The main point is that each dataframe has an executor, and we set a new one for each thread: https://github.com/vaexio/vaex/blob/54d5d30e28360cbc0d35de9cba0c09c3c6ab016e/packages/vaex-server/vaex/server/service.py#L84

Feel free to ask questions if you have more!

maartenbreddels commented 4 years ago

I think it would be good to have some reusable code for this, since there are some edge cases you need to think about. E.g. executors are non-reentrant (made this way), and a dataframe has 1 executor, so if you reassign the executor, you do that for all threads, but making your private copy (df = df.copy()) is super cheap. Let me know if you run into issues, I've thought about this for quite a while, but never had someone use it this way yet (apart from vaex-server).

anubhav-nd commented 4 years ago

Thanks @maartenbreddels for your responses. I am thinking about best way to solve this. I'll get back to you if I face issues and if not, will update this thread with the solution. Thanks again for your responses. Really appreciate it.

anubhav-nd commented 4 years ago

solved my use-case in the following manner. Processing doesn't hang anymore.

parent.py

import concurrent.futures as conc_fut
import asyncio
import worker

def spawn_children(filename, search_query)
    executor = conc_fut.ThreadPoolExecutor(max_workers=1)
    asyncio.run(worker.run_save_filtered_dataset(executor, filename, search_query))

if __name__ == '__main__':
    file_name = <specify your input file address> # Let me know if I need to share my files
    search_query = <specify your query> # Let me know if this is needed as well
    spawn_children(file_name, search_query)
    time.sleep(5) # Otherwise I am getting nested execute calls in vaex
    spawn_children(file_name, search_query)

worker.py

import vaex as vx
import traceback
import asyncio

def filter_vaex_df(input_df, search_query):

        output_df = eval('input_df.filter(%s)' % search_query)
        print('I am in actual filter')
        try:
            print('Length after filtering %d' % output_df.length()) # Workers don't get stuck now
        except Exception as e:
            print('I am in exception')
            print(e)
    return output_df

def save_filtered_dataset(parent_filename, filter_query):
    try:
        objects_df = vx.open( parent_filename)
        objects_df_copy = objects_df.copy()
        print('The length of objects df is %d' % objects_df_copy.length())
        filtered_objects_df = filter_vaex_df(objects_df_copy, filter_query)
        objects_df.close_files()
        print('The length of filtered_objects_df after filtering %d' % filtered_objects_df.length())
    except Exception as e:
        traceback.print_exc()
        raise e

async def run_save_filtered_dataset(executor, filename, search_query):
    loop = asyncio.new_event_loop()
    loop.run_in_executor(executor, save_filtered_dataset, filename, search_query)

@maartenbreddels I don't fully understand how to avoid nested execute calls if I spawn the next child instantaneously after spawning other one. Any idea what I could be missing?

anubhav-nd commented 4 years ago

hi,

I really need to use processes now to use all CPU cores. I have tried through multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor but each time my forked functions reache a vaex evaluation, they just hang. The things work fine with concurrent.futures.ThreadPoolExecutor but that is just using a single core. The functions are reopening the arrow files from disk and therefore there should not be any sharing of variables.

I am not very experience with multiprocessing. Is there something I am missing? Why would the functions just hang with processes? Has it something to do with vaex running in own executors?

anubhav-nd commented 4 years ago

Solved the issue temporarily by spawning subprocesses (in shell=True mode) under multprocessing.Pool. Not the best approach though. Would really help if we can spawn functions using vaex as processes.

maartenbreddels commented 4 years ago

FYI, #670 makes vaex thread save, which is what we needed support dash (you mentioned that). I have rebased #517 on master, so you should be able to get that working.

But I don't think it will solve your problem, if you really need multiprocessing. I have thought about supporting multiprocessing to make apply work better, I guess that's your use case?

anubhav-nd commented 4 years ago

Sorry, forgot to mention. I actually saw https://github.com/vaexio/vaex/pull/670 and tried the build from master. That indeed makes it thread safe now (I got ThreadPool working with this branch).

You're right that it doesn't solve my problem as my use case needs to exploit all CPU power that's available (not only for apply though). I want to launch multiple computations on same data file in parallel (each computation takes considerable CPU time). Currently my workaround makes all these computations as separate python files and launches them as subprocesses. Launching them as functions through multiprocessing was just giving a deadlock.

Do you think I should raise a separate issue for this one?

maartenbreddels commented 4 years ago

It is still the same issue I assume, could you have a minimum example so I can test it out?

maartenbreddels commented 4 years ago

Did you make progress on using the ProcessPoolExecutor with vaex? I'm running into the same issue, where it seems to just hang and not do anything.

anubhav-nd commented 4 years ago

Hi, Nopes, I couldn't get it working through ProcessPoolExecutor. It just hangs for me too. ThreadPoolExector, though, works fine. I resorted to launching subprocesses for my use case, an approach which I would like to get rid of.

maartenbreddels commented 4 years ago

I have something working, which is to use ProcessPoolExecutor, but not trigger any computation from the parent process before doing this. It seems that the forking when the threads are created seems to cause issues.

Sth like this seems to work:



import vaex
df = vaex.open('./nyc_taxi_2012d.hdf5')
# calling df.x.max() will cause a hang later on

def do():
   df.groupby([dff.pickup_zone, dff.dropoff_zone, dff.pickup_day], agg={'count_trips': vaex.agg.count()})
def main():
    # vaex.multithreading.thread_count_default = 16

    import concurrent.futures
    N = 4 #4*64
    if 0:
        for i in range(N):
            do()
    else:
        pool = concurrent.futures.ProcessPoolExecutor(max_workers=16)
        # pool = concurrent.futures.ThreadPoolExecutor(64)
        futures = [pool.submit(do) for i in range(N)]
        concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
        for k in futures:
            k.result()
if __name__ == '__main__':
    main()