dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.61k stars 1.59k forks source link

[CT-2958] [Feature] Cancel open connections on SIGTERM (in addition to SIGINT) #8356

Open rchui opened 1 year ago

rchui commented 1 year ago

Is this your first time submitting a feature request?

Describe the feature

When running with an orchestrator like Airflow, users can set DAG timeouts that will issue SIGTERM to running processes. The SIGTERM does not signal to DBT to cancel a running query like SIGINT will and will cause the query to continue running even if it has exceeded the Airflow specified timeout.

This can cause a thundering herd situation when you have an incremental model that runs very frequently. The task will timeout killing the DBT process, restart and run the query again with the old one running. On and on and on.

It would be useful to specify a timeout for a Postgres query to be killed if it exceeds a certain amount of time. This can be achieved by injecting a SQL header to set statement_timeout like:

"SET statement_timeout TO '15min' ; "

This could be configured globally in profiles.yml or as a model property

type: postgres
...
execution_timeout: 15min
{{
  config(
    execution_timeout='15min'
  )
}}

Describe alternatives you've considered

Who will this benefit?

Anyone who uses a Postgres data base and an external orchestrator that allows setting timeouts external to DBT.

Are you interested in contributing this feature?

No response

Anything else?

No response

jtcohen6 commented 1 year ago

@rchui I prefer your alternative recommendation: teaching dbt-core how to handle SIGTERM the same way it does SIGINT, whereby it attempts to cancel all open connections. I haven't researched the feasibility of that, but it sounds possible.

Alternatively, is there any way you can configure Airflow to send SIGINT first, wait some number of seconds, and then send SIGTERM?

My biggest concern with the approach based on statement_timeout is that it's not standard across adapters / data platforms. I don't have such a strong opinion about whether it's more appropriate to configure these timeouts within dbt versus the orchestrator running it, but I think users are more accustomed to the latter.

rchui commented 1 year ago

@jtcohen6

I don't believe there is a way to have Airflow issue a SIGINT instead of a SIGTERM on task timeout. Or at least not a method that I am aware of.

I tried to remedy this on my own similar to your own research. I first read this article on graceful signal handling in K8s: https://medium.com/@brianepohl/terminating-dbt-in-dagster-kubernetes-job-c53c3bc26012. This led me to another article about graceful shutdowns in ECS: https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/.

I implemented a variation of the python signal handler:

import signal, time, os

def shutdown(signum, frame):
    print('Caught SIGTERM, shutting down')
    # Finish any outstanding requests, then...
    exit(0)

if __name__ == '__main__':
    # Register handler
    signal.signal(signal.SIGTERM, shutdown)
    # Main logic goes here

On SIGTERM the handler would find all processes named "dbt" and issue a SIGINT to those processes. However the DBT process never seemed to respect it properly which I suspect is because we invoke DBT via a subprocess.run(). Maybe the new DBT programmatic invocations would make this work better but I haven't got around to trying it.

Any guidance would be appreciated.

dbeatty10 commented 11 months ago

@rchui if you haven't already done so in the meantime, are you ready to try out programmic invocations?

Your python script named runner.py can be this simple:

# runner.py

from dbt.cli.main import dbtRunner, dbtRunnerResult

cli_args = ["ls", "--select", "my_model"]
res: dbtRunnerResult = dbtRunner().invoke(cli_args)

Then execute it within your shell like this:

python runner.py

To actually run the model (and then try out behavior of SIGTERM vs SIGINT), just change ls to run within the script above.

rchui commented 11 months ago

@dbeatty10 We are already on programmatic invocations (which are a huge leap forward IMO). Are you saying that a SIGTERM handler has gone into DBT? AFAIK the behavior is still the same but we're behind a couple of versions

jtcohen6 commented 11 months ago

@rchui dbt doesn't natively know how to handle SIGTERM, although I'm open to the idea. This would be a spot to start poking around within dbt-core.

I understand @dbeatty10's proposal to be: If you're programmatically invoking dbt, within your Python script/wrapper, you could try handling signal.SIGTERM by passing SIGINT into the dbt process before exiting. No dbt-core changes needed (in theory).

rchui commented 9 months ago

@jtcohen6 I see the changes made in #8994 . I'm not sure I 100% follow. Is it that on SIGTERM we would instead raise SystemExit? Or how would we use the signal to trigger the interrupt?

crabio commented 3 months ago

Hello there!

We are using Airflow DBT and stuck with same issue that DBT is not cancelling all queries on SIGTERM, which Airflow sent to all tasks on timeout.

After some investigation I found that bash is not working with passing SIGINT for subprocesses. It is related to bash subprocesses management. You can try it by your own with this script:

#!/bin/bash

# Print process ID at start
echo "Process ID: $$"

# Function to handle SIGTERM in parent process
handle_sigterm() {
    echo "Parent process: Received SIGTERM, sending SIGINT to child process."
    kill -2 "$child_pid" 2>/dev/null
    echo "Sent signal to child $child_pid"
    wait "$child_pid"
}

# Start child process
(
    # Function to handle SIGINT in child process
    handle_sigint() {
        echo "Child process: Received SIGINT, exiting."
        exit 0
    }

    trap 'handle_sigint' SIGINT SIGTERM
    while true; do
        echo "Current time: $(date)"
        sleep 5
    done
) &
child_pid=$!

# Print process ID at start
echo "Child ID: $child_pid"

# Set up trap for SIGTERM in parent process
# trap 'handle_sigterm' SIGINT

# Wait for child process to finish
wait "$child_pid"

But it is working well with Python:

import os
import signal
import time
import sys
from multiprocessing import Process

def print_time():
    def signal_handler(signum, frame):
        print("Child process: Received SIGINT, exiting.")
        sys.exit(0)
    signal.signal(signal.SIGINT, signal_handler)

    print("Child PID: ", os.getpid())

    while True:
        print("Current time: ", time.ctime())
        time.sleep(5)

def main():
    p = Process(target=print_time)
    p.start()

    print("Parent PID: ", os.getpid())

    def signal_handler(signum, frame):
        print("Parent process: Received SIGTERM, sending SIGINT to child process.")
        os.kill(p.pid, signal.SIGINT)
        p.join()
        print("Child process finished.")
    signal.signal(signal.SIGTERM, signal_handler)

    p.join()

if __name__ == "__main__":
    main()

WIth these insights we decided to rewrite our tasks onto Python. But it would be easier to use DBT with handling SIGTERM and cancelling all queries on exit...

gaoshihang commented 2 days ago

Hi @jtcohen6 I encountered the same issue in Databricks, Is there any plan to fix this? Thanks!