dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.84k stars 1.35k forks source link

[pipes] forwarding termination #18570

Open alangenfeld opened 7 months ago

alangenfeld commented 7 months ago

It should probably be standard in our clients to at least offer an option to attempt to cancel the external process if the orchestrating process is interrupted.

Docker https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-docker/dagster_docker/pipes.py#L189-L190 and K8s https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py#L273-L274 handled this already by stopping the containers in their finally blocks.

geoHeil commented 7 months ago

based on this discussion https://github.com/dagster-io/dagster/discussions/18459

geoHeil commented 7 months ago

I am looking into implementing this and thought that a try catch in the external asset for DagsterExecutionInterruptedError is a good idea.

However, this is only available from:


from dagster_pipes import DagsterExecutionInterruptedError

Am I misunderstanding this? I.e. should this be part of the dagster-native/asset code? Or should this exception be made available to pipes?

geoHeil commented 7 months ago

I have explored:

import os
import random
import signal
import time

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes

def handle_interrupt(signum, frame):
    """
    Handle the interrupt signal to perform cleanup actions.
    """
    print("***********")
    print("Interrupted, cleaning up resources...")
    print(f"Signal: {signum}, Frame: {frame}")
    print("***********")
    # Perform your cleanup here
    # For example, terminating subprocesses, closing files, etc.
    exit(1)

def main2():
    # Register the signal handler for interruption
    signal.signal(signal.SIGINT, handle_interrupt)
    signal.signal(signal.SIGTERM, handle_interrupt)
    # TODO This is not yet properly cleaning up the canceled run - dagster buffers are not flushed. However, open_dagster_pipes is actually in use.
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    context = PipesContext.get()
    print(context.extras)
    print(context.get_extra("foo"))
    sample_rate = context.get_extra("sample_rate")
    context.log.info(f"Using sample rate: {sample_rate}")
    print(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])

    context.log.info("sleeping for 90 seconds")
    time.sleep(90)

    context.report_asset_materialization(
        metadata={
            "some_spark_metric": random.choice(["scranton", "new york", "tallahassee"])
        },
    )

if __name__ == "__main__":
    with open_dagster_pipes():
        main2()

However in the logs I observe:


    #[pipes] did not receive closed message from external process.
    # Buffered messages may have been discarded without being delivered.
    # Use `open_dagster_pipes` as a context manager (a with block) to ensure that cleanup is successfully completed.
    # If that is not possible, manually call `PipesContext.close()` before process exit.

I would expect that dagster is closing/terminating and flushing using the open_dagster_pipes correctly - when terminated from the dagit UI.

alangenfeld commented 7 months ago

heres the implementation for the subprocess client https://github.com/dagster-io/dagster/pull/18685

thought that a try catch in the external asset for DagsterExecutionInterruptedError is a good idea.

DagsterExecutionInterruptedError is going to get raised in the orchestration process, so in the @op / @asset. You need to handle it there and then forward that in whatever means. In the subprocess case thats terminating the subprocess and databricks will be making the api call to delete the job.

I have explored

If you were using the subprocess client, it currently does not have an except DagsterExecutionInterruptedError or finally block so the interrupt exception will immediately exit the open_pipes_session context manager without waiting for the subprocess to write out the close message.

geoHeil commented 7 months ago

Simply terminating the dagster part of the pipes project never was a problem.

However, forwarding the termination request to the externally launched resource and terminating these gracefully was.

I (locally) already explored something similar to:

https://github.com/dagster-io/dagster/pull/18685/files#diff-97d6a06feb66c1cbac29016496e8ec8cac0093cda0cd2f3bdb4f572e2ac2f0d0R117-R119

But unless my code - as shown above is implementing the sigterm handler:

signal.signal(signal.SIGINT, handle_interrupt)
signal.signal(signal.SIGTERM, handle_interrupt)

it does not work to register a handler to close the resources.

Secondly, when experimenting with the additional try/catch:


    #[pipes] did not receive closed message from external process.
    # Buffered messages may have been discarded without being delivered.
    # Use `open_dagster_pipes` as a context manager (a with block) to ensure that cleanup is successfully completed.
    # If that is not possible, manually call `PipesContext.close()` before process exit.

was still showing up in the logs which is confusing. How can the buffers still be fully flushed - even in case of termination?

Can you share an E2E example? (i.e. also the client side code)

alangenfeld commented 7 months ago

How can the buffers still be fully flushed - even in case of termination?

So theres 2 pieces for ensuring the final messages

Can you share an E2E example? (i.e. also the client side code)

A sample external process side code is under test here https://github.com/dagster-io/dagster/pull/18685/files#r1425711132. I n the current form of the PR I switched to SIGINT to avoid having to have the external process register a SIGTERM handler.

geoHeil commented 7 months ago

In the provided code the exit is not executing properly - otherwise, I would not get this error message. From looking at the code - just like you I would expect it to work though. (tested on 1.5.12 of dagster)

geoHeil commented 7 months ago

I will explore it again locally with your PR changes and sample tests and then report back - thx.

geoHeil commented 7 months ago

I have modified the program a little bit:

def handle_interrupt(signum, frame):
    print("Interrupted, cleaning up resources...")
    exit(0)

def main():
    signal.signal(signal.SIGINT, handle_interrupt)
    context.log.info("sleeping for 90 seconds")
    time.sleep(90)

if __name__ == "__main__":
    with open_dagster_pipes():
        main()

this leaves me with some questions:

alangenfeld commented 7 months ago

do I still manually need to register the handle_intterupt function for SIGINT? I guess the answer is yes? Or would it be possible to have the open pipes session directly set this up out of the box?

You should not need to set a custom handler for SIGINT. By default it will raise a KeyboardInterrupt which should raise through and invoke the context managers __exit__ . Your custom handler exit() raises SystemExit which should have effectively the same behavior.

geoHeil commented 7 months ago

I would prefer not to require a custom handler. But when I need to terminate custom resources such as DataBricks, DataProc, or others I think this will be required.

EDIT: At least for this mini example it looks like a try/catch for the KeyboardInterrupt seems to be fine