NOAA-OWP / t-route

Tree based hydrologic and hydraulic routing
Other
42 stars 45 forks source link

Asynchronous File Load #282

Open jhrehanoaa opened 3 years ago

jhrehanoaa commented 3 years ago

I wanted to arrange an issue that somewhat organizes the approach to how we can possibly handle this problem. Initially, we came up with the idea of using something like asyncio to asynchronously load csv files as they are being generated by the model to avoid long load times during a restart iteration. However, this process has not been straightforward and has some potential pitfalls. I will talk through the design below.

Aysncio appears to have the design of not being able to nest asynchronous calls inside regular functions. Instead, it looks like you create several asynchronous processes then fetch them at the end in order to start running.

import time, asyncio
A = []
B = []

async def count():
    print("Initial Compute of NHD V2")
    await asyncio.sleep(4)
    B[:] = ["NHDR1"]
    await asyncio.sleep(4)
    B[:] = ["NHDR2"]
    await asyncio.sleep(4)
    B[:] = ["NHDR3"]
async def count_further():
    await asyncio.sleep(4.1)
    A[:] = ["x"] + B
    print(A)
    await asyncio.sleep(4.1)
    A[:] = ["y"] + B
    print(A)
    await asyncio.sleep(4.1)
    A[:] = ["z"] + B
    print(A)
async def count_even_further():
    await asyncio.sleep(2)
    print("Operation 1")
    await asyncio.sleep(2)
    print("Operation 2")
    await asyncio.sleep(2)
    print("Operation 3")
# await main()
s = time.perf_counter()
await asyncio.gather(count(), count_further(), count_even_further())
elapsed = time.perf_counter() - s
print(f"Script executed in {elapsed:0.2f} seconds.")

This returns

Initial Compute of NHD V2
Operation 1
Operation 2
['x', 'NHDR1']
Operation 3
['y', 'NHDR2']
['z', 'NHDR3']
Script executed in 12.30 seconds.

This is somewhat problematic for what we are trying to achieve as the read_csv process we are trying to asynchronously calculate is found inside the main nhd computation. Furthermore, the first iteration of the main code block creates the initial q0 used for computation. Then we move on after this iteration to using the csv_file loader. This can all be found in the self loop merged PR as well.

if ts_iterator == 0:
        q0 = nnu.build_channel_initial_state(
            restart_parameters, supernetwork_parameters, param_df.index
        )
    else:
        q0_file_name = (
            restart_parameters["wrf_hydro_channel_restart_file"][:-15]
            + str(ts_iterator + 1)
            + ".csv"
        )
        q0 = nnu.restart_file_csv(q0_file_name)

My idea, if we were to proceed with this, was to asynchronously start the main code block and the nnu.restart_file_csv(q0_file_name) with a lagging start for the nnu.restart_file_csv(q0_file_name), so that each time the computation needs to access its computation it will have been updated by this async process globally. This would require me moving it outside of the main code section and lining it up correctly time wise with the main code section, which seems janky. Since it also requires the restart_parameters and ts_iterator I figured those could be snagged from the initial run of the main code block by outputting them globally. Below is some pseudocode.

global q0

main() code
    if ts_iterator == 0:
         run normal operation
    else:
         q0 = global q0

async def calc_q0:
    await asyncio.sleep(40)                --(40 just an estimate of the time it takes to compute through main code)
    q0 = nnu.restart_file_csv(q0_file_name)
    return q0:

global q0 = await asyncio.gather(calc_q0())

I can try to clarify if that is confusing. Maybe I am misunderstanding how asyncio works or overcomplicating the problem, but I cannot see how else to use this feature to avoid slow read_csv times. This is more or less where I was at in brainstorming/experimenting with this process and was looking for feedback if anyone had a simpler approach or clarification.

hellkite500 commented 3 years ago

Where exactly is the code that is reading these? I'm having a hard time understanding the nested call you discussed in the example and how it applies.

jhrehanoaa commented 3 years ago

The code in the PR self-loop merged has the restart capability in it. It does not have the async implementation yet. Basically when the code gets to this point,

if ts_iterator == 0:
        q0 = nnu.build_channel_initial_state(
            restart_parameters, supernetwork_parameters, param_df.index
        )
    else:
        q0_file_name = (
            restart_parameters["wrf_hydro_channel_restart_file"][:-15]
            + str(ts_iterator + 1)
            + ".csv"
        )
        q0 = nnu.restart_file_csv(q0_file_name)

it would call the async nested function in the else statement, but I don't think asyncio allows for that sort of functionality by design.

hellkite500 commented 3 years ago

is the restart file a pure output? i,e it isn't needed for any further executions?

jhrehanoaa commented 3 years ago

So the idea right now is that we create these restart files in order to run and restart the model periodically. Normally, if we ran a 1yr execution or something large it would take up too much memory and crash the model. To alleviate this we are trying to run the model for like a month or week at a time, produce the results, and output a new restart file to continue from. From this point the model should restart off the previous runs q0 values. Essentially, run 1 month and output new csv, restart for next month using this csv, and so on until a year or whatever time frame is complete.

hellkite500 commented 3 years ago

My initial inclination is you are looking at something like this...


def restart_file_csv(name):

    to_csv(name)

async def some_control_func():

    if ts_iterator == 0:
        q0 = nnu.build_channel_initial_state(
            restart_parameters, supernetwork_parameters, param_df.index
        )
    else:
        q0_file_name = (
            restart_parameters["wrf_hydro_channel_restart_file"][:-15]
            + str(ts_iterator + 1)
            + ".csv"
        )
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, nnu.restart_file_csv, q0_file_name)

loop = asyncio.get_event_loop()
loop.run_until_complete( some_control_func() )
jhrehanoaa commented 3 years ago

I will try something like this and see if I can get it working.

jhrehanoaa commented 3 years ago

Just an update: I got this code running inside of the model. However, it does not run asynchronously as it still has to wait until this part of the code to run the ts_iterator == 0 check. After it runs through the if else statements and loads accordingly. I believe this operation will need to be run in unison with the main() code not within it in order to utilize the async capability. The call at the end to get results is where the aysnc operation will end. For our purposes, we are hoping to load q0 ahead of time so that with the main() compute_nhd_routing_v02 can easily access the appropriate q0s when it gets around to it each iteration.

       async def some_control_func(ts_iterator,restart_parameters):

        if ts_iterator == 0:
            q0 = nnu.build_channel_initial_state(
                restart_parameters, supernetwork_parameters, param_df.index
            )

            return q0
        else:
            q0_file_name = (
                restart_parameters["wrf_hydro_channel_restart_file"][:-15]
                + str(ts_iterator + 1)
                + ".csv"
            )
            q0 = nnu.restart_file_csv(q0_file_name)

            print(q0)
            return q0
            loop = asyncio.get_event_loop()
            await loop.run_in_executor()

    loop = asyncio.get_event_loop()
    q0 = loop.run_until_complete( some_control_func(ts_iterator,restart_parameters))
hellkite500 commented 3 years ago

what do you expect to run asynchronously? Nothing actually gets run in await loop.run_in_executor(), you need to pass it a routine to run, and it gets wrapped into a coroutine that can be scheduled and run asynchronously inside the event loop.

hellkite500 commented 3 years ago

Here is the documentation for run_in_executor()

groutr commented 3 years ago

@jhrehanoaa This might help clarify the relation between synchronous and asynchronous function calls: https://www.aeracode.org/2018/02/19/python-async-simplified/

My solution to slow csv read times is usually to not use csv. :) Perhaps look at parquet or feather.

hellkite500 commented 3 years ago

There is that, a simple pandas to_parquet if serializing a data frame. It does require fast parquet or pyarrow as a dependency. I've found pyarrow to be more functional. Would still be beneficial to chunk simulation time and async load/save restarts while a chunk of time is simulating.

jhrehanoaa commented 3 years ago

Would it just be simpler to parquet, feather, or pyarrow this and leave it serial? I'm struggling in this documentation to find a way to fit this into our code coherently.

groutr commented 3 years ago

I'm not entirely convinced that async is what you're wanting here. Async does not necessarily mean parallel. Doing a file read/write asynchronously still means the computation will need to be interrupted. Where would be a good point to pause the computation while the IO is working and vice/versa? The event loop is going to be running one task at a time still. A process pool could allow you do some things in parallel, but would require serializing your data across processes.

jhrehanoaa commented 3 years ago

Right now it is looking like:

I'm just confused about how I can nest this aysnc inside the main loop realistically. It needs to start at the end of a run and complete at the beginning of the next run.

groutr commented 3 years ago

Designing async version would require a shift in thinking about how your tasks are defined. I doubt it would be possible to simply drop async inside the existing main loop. You are clearly still thinking about these tasks in synchronous terms. I reiterate again that async doesn't imply parallel, it implies overlap. Async will still be single thread/single process by default. A good way to think of asyncio is "cooperative multitasking". The idea is that if some function is waiting on a resource (like a server response or IO request, or even the result of another function), it can yield control to another part of code that can execute. That other part of code will then yield control back when it's done or when it also waits for some resource. That is the cooperative part. A task voluntarily yields control when it can. In this way, tasks can overlap their execution, but still, only on task is executing at a time. Now, in some cases, you can run multiple tasks in parallel, assuming there are no inter-dependencies. In that case, you would schedule a task to write the csv restart file and another task to run the main code. When you get to the check for q0, you would simply wait until the last restart has finished writing to disk (there is no guarantee when that will finish though, it could be before or after the main code finishes). Futures would be used to check if a task has completed or not. An example flow

Task 1: Write a CSV file given data. The function call accepts a Future object and returns a new Future object. Task 2: Execute main code. The function call accepts a Future object and returns a new Future object. Task 3: Read CSV file. Function call accepts a future object and returns a new Future object.

When the tasks are scheduled for the first time, some dummy Future objects would be passed in to simply start the functions. Task 3 would receive Future objects returned from Task 1 (ie, don't read a file until Task 1 has completed successfully). Task 2 would receive Future objects returned from Task 3 (ie don't start computing until the csv file has been read and returned). Task 1 would receive Future objects returned from Task 2 (write after Task 2 has completed). The first thing each function does is await the future it receives, meaning it yields control until the Future has been realized. It then continues to perform its work, yield when possible. The actual function call to an async function returns a Future object which we will then pass to the next function in the flow.

jameshalgren commented 2 years ago

@jhrehanoaa This [stale] branch and PR comments detail how we might accomplish asynchronously looped long-term simulations: https://github.com/jameshalgren/t-route/pull/22 The comments provide an example stand-alone test program with a few simple inputs which can show the general concept in operation, following on the concept laid out in discussion above by @groutr and @hellkite500 as follows:

        '''
        Asynchronous execution Psuedocode
        Sync1: Prepare first warmstate from files
        Sync1: Prepare first forcing from files

        For first forcing set
            Sync2a: run model
            Sync2b: begin preparing next forcing
            Sync3a - AFTER Sync2a, prepare next warmstate (last state of model run)
            Sync3b: write any output from Sync2a
            Loop has to wait for Sync2a+b+Sync3a, does not have to wait for Sync3b
                  if next forcing prepared
        '''

That pull request was made into a branch where looping capability was being developed -- that capability, now more mature, but basically identical in concept is now merged into upstream master.

Notes: