GoogleCloudPlatform / composer-local-dev

Apache License 2.0
74 stars 37 forks source link

composer-dev restart causes "Variable does not exist" and "database is locked" errors #63

Open pokeshun96 opened 1 week ago

pokeshun96 commented 1 week ago

First of all, thank you for developing and maintaining such a great tool! I’ve been using composer-local-dev and really appreciate the effort put into making local development with Airflow easier.

However, I encountered the following errors:

  1. After starting the container and setting Airflow variables by DAG airflow_variable_set, when I run composer-dev restart, I get the following error:
    Traceback (most recent call last):
    File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/dagbag.py", line 342, in parse
    loader.exec_module(new_module)
    ...
    KeyError: 'Variable ENV does not exist'
  2. Additionally, I also see an error related to SQLite:
    sqlite3.OperationalError: database is locked

    It seems like the variables are not correctly loaded upon container restart, and the database is locked during operations.

Steps to Reproduce:

  1. Start the container and set an Airflow variable by the following DAG airflow_variable_set.
  2. Run composer-dev restart.

Expected Behavior: After restarting, the variables should persist without causing errors.

Actual Behavior: The restart results in errors saying the variable does not exist, and the database is locked.

Environment:

Composer Image Version: composer-2.8.2-airflow-2.7.3 Python Version: 3.10.14

airflow_variable_set.py:

import json
import os

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.utils.dates import days_ago

HOME = os.environ["HOME"]
VARIABLE_PATH = f"{HOME}/gcs/dags/src/common/variables"

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
    "provide_context": True,
    "cron": None,
}

# Get variables from a JSON file
def get_variables(target_variable):
    filename = f"{target_variable}_variable.json"
    with open(os.path.join(VARIABLE_PATH, filename), "r") as f:
        return json.loads(f.read())

with DAG(
    dag_id=f"airflow_variable_set",
    description="""
    DAG that sets Airflow variables
    """,
    schedule_interval=None,
    default_args=default_args,
    tags=["common"],
) as dynamic_generated_dag:

    @task(task_id="set_variable_task")
    def variable_set():
        GCP_PROJECT_ID = os.environ["GCP_PROJECT"]

        data = get_variables(GCP_PROJECT_ID)
        for key, value in data.items():
            Variable.set(key, value)

    # Since WORK_BUCKET changes when Composer is created, make it a separate task
    @task(task_id="set_variable_work_bucket_task")
    def variable_set_work_bucket():
        WORK_BUCKET = os.environ["GCS_BUCKET"]

        Variable.set("WORK_BUCKET", WORK_BUCKET)

    variable_set() >> variable_set_work_bucket()

    globals()[dynamic_generated_dag.dag_id] = dynamic_generated_dag

I would greatly appreciate any guidance or fixes for this issue. Thanks again for your hard work!

liepelt-cornelius commented 1 week ago

Iam having the same error: sqlite3.OperationalError: database is locked