Closed mbopfNIH closed 1 year ago
Some of the code changes necessary for switching from Prefect v1 to v2:
requirements.txt
add install of prefect_dask
and change to prefect=2.*
@flow
decorator on "def dm_flow()
) is now used instead of the Flow
classThis also allows for better control of Exceptions
raise signals.FAIL
to raise <Exception>
and handle in more standard fashionprefect.task_runners.ConcurrentTaskRunner
instead of prefect.executors.LocalExecutor
config.py
, switch from prefect.executors.DaskExecutor
to prefect_dask.DaskTaskRunner
SUCCESS
is now `prefect.state.StateType.COMPLETEDprefect.triggers
have been removed. Need to handle this in different ways (Python code, filters, etc.)max_retries
-> retries
; retry_delay_seconds=10
; No triggersget_run_logger is context dependent and will return one either based on the current flow or task Need way to create log per input file
upstream_tasks
with wait-for
prefect.runtime.flow_run
or prefect.runtime.task_run
Error Handling Trying to get Prefect 2 to continue through the entire flow run if an error occurs. There seem to be a lot of error handling mechanisms, but so far don't seem to work with our system.
Tried:
Next:
utils.filter_results()
Using utils.filter_results() made it possible to get a "bad_tif" test to work. Changes made:
dm_conversion/flow.py
wrapped call to convert_if_int16_tiff
with utils.filter_results()
However, extending this to later calls in the flow causes unexpected errors, even in non-error cases. Wrapping convert_dms_to_mrc
for example, causes an "Impossible state" error in scale_jpgs
because not all "dm_mrcs" get converted to jpegs, for some reason.
Next:
convert_2d_mrc_to_tiff
isn't working for some reasonFixed issue where the flow in Prefect 2 wasn't providing the callback data to utils.send_callback_body
as it wasn't waiting for the add_asset
calls to complete (or even start). So I added an extra wait_for
on those tasks and it is now getting all the assets.
Tried changing the dm flow to use utils.notify_api_completion
as a Prefect 2 on_completion
change hook, which is a similar concept. However, the flow is completely gone when this routine is called, so we no longer have values for "no_api" and "callback_url". I am going to try to change to flow to run this after everything completes, instead. If that doesn't work, I'll try using the "subflow" concept which may also work for the notify_api_running
state handler, too.
Making utils.notify_api_completion
a task and calling at the end of the flow works for a "clean" run, but doesn't if there are errors in the flow. Could test using a Prefect 2 on_failure
hook, but that will likely have the same issues as the on_completion
hook. Will next try using a couple sub-flows in order to send the API messages while maintaining a context.
The sub-flow solution may work. Tried working with Prefect engineers to get our current, single flow system to work, but the whole flow fails if any error is encountered. This means that the notify_api_completion
never gets called.
Instead, prototyped a "main_flow" with three subflows. The first subflow would simply do the notify_api_running
, the second would run the current workflow as is - errors and all, while the third flow will do the equivalent of notify_api_completion
.
It is important to note how Prefect handles return State from a Flow or Task. It useful to return a State from a flow or task like this:
@flow
def my_flow():
state = my_task(return_state=True)
And the result of the flow in this case can be determined by calling:
my_result = state.result()
However, if the flow or state has thrown and exception, it will be thrown immediately upon calling .result()
. It will also be evaluated if passed as an argument to another flow. I think this is one of the main issues when trying to get a long chain of tasks to complete despite errors - it's hard to handle the different error scenarios while always avoiding evaluating the result. It is probably possible, but tricky, plus the code is more complicated.
The "working" subflow-based code looks something like this:
@flow(log_prints=True)
def main_flow(input_dir: str, [more params...] ):
utils.notify_started_flow(no_api, token, callback_url)
lrg2d_flow_result = lrg_2d_flow(input_dir, [etc]). # the original "flow"
if lrg2d_flow_result.is_completed():
print("**** lrg_2d_flow COMPLETED")
utils.notify_completed_flow("Completed", token, callback_url, no_api)
else:
print("**** lrg_2d_flow did NOT complete")
utils.notify_completed_flow("Failed", token, callback_url, no_api)
Where the flows in utils.py
looks like this:
@flow
def notify_started_flow(no_api, token, callback_url):
notify_api_running(no_api, token, callback_url) # Original call, effectively
@flow
def notify_completed_flow(status_msg, token, callback_url, no_api):
notify_api_completion(status_msg, token, callback_url, no_api) # Original call, effectively
Note that we can't just pass the lrg2d_flow_result
directly to the notify_completed_flow
. The reason is that Prefect will evaluate the parameter before it passes it, and if the result contains an Exception, it will be immediately thrown and the API will NOT be notified.
The utils.notify
calls are wrapped in subflows for two different reasons. The goal of the notify_started_flow
is to wait until the HPC has actually kicked off the main_flow before telling the API. I haven't been able to test this yet, so it may or may not work. I'm simulating the use of a Prefect 1 state handler, which no longer exists.
The reason for placing the notify_api_completion
call in it's own subflow is to try to guarantee that the primary flow (lrg_2d_flow) has actually finished, in either a completed or a failed state. Actually, in most error scenarios, the primary flow ends up in a PENDING state; I believe this is due to a "feature" in how map
is handled in P2. In any case, this appears to be working for all of our test cases.
This prototype system is completed and pushed to the Prefect2Subflows branch