procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
800 stars 52 forks source link

Procrastinate doesn't play nicely with django-test-migrations #1090

Closed medihack closed 2 weeks ago

medihack commented 2 weeks ago

While integrating Procrastinate into our applications, we found that Procrastinate doesn't play well together with django-test-migrations (see error below). I don't blame that this is the fault of Procrastinate, but it is more I guess how django-test-migrations cleans the database.

This is what happens when running a test ...

@pytest.mark.django_db
def test_0007_convert_institutes_to_groups(migrator: Migrator):
    old_state = migrator.apply_initial_migration(("accounts", "0006_user_active_group"))
self = <django.db.backends.utils.CursorWrapper object at 0x725f69481040>
sql = 'CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;\n\nCREATE TABLE IF NOT EXISTS procrastinate_version (\...gger_scheduled_events_procedure();\n\nCREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);\n'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x725f69481040>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.postgresql.base.Cursor [closed] [BAD] at 0x725f69b22750>
query = 'CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;\n\nCREATE TABLE IF NOT EXISTS procrastinate_version (\...gger_scheduled_events_procedure();\n\nCREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);\n'
params = None

    def execute(
        self,
        query: Query,
        params: Optional[Params] = None,
        *,
        prepare: Optional[bool] = None,
        binary: Optional[bool] = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.DuplicateObject: type "procrastinate_job_status" already exists

/opt/pysetup/.venv/lib/python3.12/site-packages/psycopg/cursor.py:732: DuplicateObject

The above exception was the direct cause of the following exception:

self = <django_test_migrations.migrator.Migrator object at 0x725f699691f0>

    def reset(self) -> None:
        """
        Reset the state to the most recent one.

        Notably, signals are not muted here to avoid
        https://github.com/wemake-services/django-test-migrations/issues/128

        """
>       call_command('migrate', verbosity=0, database=self._database)

/opt/pysetup/.venv/lib/python3.12/site-packages/django_test_migrations/migrator.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/pysetup/.venv/lib/python3.12/site-packages/django/core/management/__init__.py:194: in call_command
    return command.execute(*args, **defaults)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/core/management/base.py:459: in execute
    output = self.handle(*args, **options)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/core/management/base.py:107: in wrapper
    res = handle_func(*args, **kwargs)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/core/management/commands/migrate.py:356: in handle
    post_migrate_state = executor.migrate(
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/executor.py:135: in migrate
    state = self._migrate_all_forwards(
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/executor.py:167: in _migrate_all_forwards
    state = self.apply_migration(
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/executor.py:252: in apply_migration
    state = migration.apply(state, schema_editor)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/migration.py:132: in apply
    operation.database_forwards(
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/operations/special.py:106: in database_forwards
    self._run_sql(schema_editor, self.sql)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/migrations/operations/special.py:133: in _run_sql
    schema_editor.execute(statement, params=None)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/postgresql/schema.py:45: in execute
    return super().execute(sql, params)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/base/schema.py:202: in execute
    cursor.execute(sql, params)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/utils.py:79: in execute
    return self._execute_with_wrappers(
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/utils.py:92: in _execute_with_wrappers
    return executor(sql, params, many, context)
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/utils.py:100: in _execute
    with self.db.wrap_database_errors:
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/utils.py:91: in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
/opt/pysetup/.venv/lib/python3.12/site-packages/django/db/backends/utils.py:103: in _execute
    return self.cursor.execute(sql)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.postgresql.base.Cursor [closed] [BAD] at 0x725f69b22750>
query = 'CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;\n\nCREATE TABLE IF NOT EXISTS procrastinate_version (\...gger_scheduled_events_procedure();\n\nCREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);\n'
params = None

    def execute(
        self,
        query: Query,
        params: Optional[Params] = None,
        *,
        prepare: Optional[bool] = None,
        binary: Optional[bool] = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.ProgrammingError: type "procrastinate_job_status" already exists

/opt/pysetup/.venv/lib/python3.12/site-packages/psycopg/cursor.py:732: ProgrammingError
================================================================================= short test summary info ==================================================================================
ERROR adit_radis_shared/accounts/tests/test_migrations.py::test_0007_convert_institutes_to_groups - django.db.utils.ProgrammingError: type "procrastinate_job_status" already exists
================================================================================ 1 passed, 1 error in 5.62s ================================================================================

One workaround I found is to drop all the Procrastinate stuff manually

@pytest.mark.django_db
def test_0007_convert_institutes_to_groups(migrator: Migrator):
    with connection.cursor() as cursor:
        cursor.execute("DROP TABLE procrastinate_jobs CASCADE;")
        cursor.execute("DROP TABLE procrastinate_events CASCADE;")
        cursor.execute("DROP TABLE procrastinate_periodic_defers CASCADE;")
        cursor.execute("DROP TYPE procrastinate_job_status CASCADE;")
        cursor.execute("DROP TYPE procrastinate_job_event_type CASCADE;")
        cursor.execute("DROP FUNCTION procrastinate_notify_queue;")
        cursor.execute("DROP FUNCTION procrastinate_trigger_status_events_procedure_insert;")
        cursor.execute("DROP FUNCTION procrastinate_trigger_status_events_procedure_update;")
        cursor.execute("DROP FUNCTION procrastinate_trigger_scheduled_events_procedure;")
        cursor.execute("DROP FUNCTION procrastinate_unlink_periodic_defers;")
        cursor.execute(
            "DROP FUNCTION procrastinate_defer_job(character varying, character varying, integer ,text, text, jsonb, timestamp with time zone);"
        )
        cursor.execute(
            "DROP FUNCTION procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone);"
        )
        cursor.execute(
            "DROP FUNCTION procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, integer, character varying,  bigint, jsonb);"
        )
        cursor.execute(
            "DROP FUNCTION procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, character varying,  bigint, jsonb);"
        )

    old_state = migrator.apply_initial_migration(("accounts", "0006_user_active_group"))

This is, of course, rather ugly (is there a more easy way?).

I wonder if it would make sense to always use CREATE OR REPLACE (even in the 00.00.00_01_initial.sql migration). That way, the duplication error will be avoided.

I totally understand if we don't do anything and just say it's the fault of django-test-migrations, especially as there is a similar open issue in the repo of django-test-migration).

ewjoachim commented 2 weeks ago

Trying to understand how it ends up with duplicate objects. I would imagine that doing the setup involves starting from a fresh DB, and as long as it doesn't do back-and-forth with migrations, then there's no reason it would do that.

But if it either doesn't start from a fresh DB or if the process involves applying and unapplying a migration, then of course, given our migrations don't include a reverse SQL, it's going to be problematic at some point. If that's the case, I believe it should be possible that RunProcrastinateSQL would be able to offer automated reverse SQL using migra.

medihack commented 2 weeks ago

I think it's not a problem with reverse migrations as they only use forward migrations. I guess it's more a problem that they only drop all model tables and don't respect anything that may be created by some custom SQL code. So I guess we can't do anything about it, but I have to just use that ugly workaround I mentioned.

medihack commented 2 weeks ago

Just in case anyone in the future is having that problem, another way to drop everything that is prefixed with procrastinate_:

with connection.cursor() as cursor:
    cursor.execute("""
        DO $$ 
        DECLARE
            prefix text := 'procrastinate_';
            rec record;
        BEGIN
            -- Drop tables
            FOR rec IN 
                SELECT tablename
                FROM pg_tables
                WHERE tablename LIKE prefix || '%'
            LOOP
                EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(rec.tablename) || ' CASCADE';
            END LOOP;

            -- Drop functions
            FOR rec IN 
                SELECT n.nspname as schema_name,
                    p.proname as func_name,
                    pg_catalog.pg_get_function_identity_arguments(p.oid) as args
                FROM pg_proc p
                    LEFT JOIN pg_namespace n ON n.oid = p.pronamespace
                WHERE p.proname LIKE prefix || '%'
            LOOP
                EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(rec.schema_name) || '.'
                || quote_ident(rec.func_name) || '(' || rec.args || ') CASCADE';
            END LOOP;

            -- Drop types
            FOR rec IN 
                SELECT typname
                FROM pg_type
                WHERE typname LIKE prefix || '%'
            LOOP
                EXECUTE 'DROP TYPE IF EXISTS ' || quote_ident(rec.typname) || ' CASCADE';
            END LOOP;
        END $$;
    """)