olirice / alembic_utils

An alembic/sqlalchemy extension for migrating sql views, functions, triggers, and policies
https://olirice.github.io/alembic_utils
MIT License
207 stars 43 forks source link

Can't create trigger on the table from the same migration #79

Open LKay opened 2 years ago

LKay commented 2 years ago

As title suggest, these seems to be a bug when creating trigger on a table that doesn't exist yet and is part of the same migration. In such case autogenerated create fails with error.

Example definition:

schema = 'public'

extension_uuid = PGExtension(
    schema=schema,
    signature='uuid-ossp',
)

function_updated_at_auto = PGFunction(
    schema=schema,
    signature="updated_at_auto()",
    definition="""
        RETURNS TRIGGER LANGUAGE plpgsql AS $$
        BEGIN
          NEW.updated_at = NOW();
          RETURN NEW;
        END;
        $$;
    """
)

test = sa.Table(
    "test",
    metadata,
    sa.Column("id", pg.UUID, nullable=False, server_default=sa.func.uuid_generate_v1mc(), primary_key=True),
    sa.Column("name", pg.TEXT, nullable=False),
    sa.Column("created_dt", pg.TIMESTAMP(timezone=True), nullable=False, server_default=sa.func.now()),
    sa.Column("updated_dt", pg.TIMESTAMP(timezone=True), nullable=False, server_default=sa.func.now()),
)

trigger_test = PGTrigger(
    schema=schema,
    signature=f"{terst.name}_updated_at_trigger",
    on_entity=f"{schema}.{test.name}",
    definition=f"""
        BEFORE UPDATE ON {schema}.{test.name}
        FOR EACH ROW EXECUTE PROCEDURE {schema}.updated_at_auto()
    """,
)

And registering this in env.py:

register_entities([
    extension_uuid,
    function_updated_at_auto,
    trigger_test
])

This results in error:

INFO  [alembic_utils.replaceable_entity] Detecting required migration op PGTrigger PGTrigger: public.test_updated_at_trigger False public.test
...
sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.UndefinedTableError'>: relation "public.test" does not exist
[SQL: CREATE TRIGGER "test_updated_at_trigger" BEFORE UPDATE ON public.test FOR EACH ROW EXECUTE PROCEDURE public.updated_at_auto()]
(Background on this error at: https://sqlalche.me/e/14/f405)

Creating trigger works, however if table already exists. As a workaround the migration has to be broken up into two, first with all entities and second with only triggers attached to the tables. This seems like a bug and splitting to separate migrations should not be necessary.

olirice commented 2 years ago

Hi, this is the same challenge as https://github.com/olirice/alembic_utils/issues/41

This seems like a bug and splitting to separate migrations should not be necessary.

I agree that it's clunky. The summary is that alembic_utils can not refer to alembic entities in a single migration due to the way it has to do dependency resolution. The difference is, alembic inspects the database and diffs the sqla models with the db schema in python while alembic_utils

so in the case of a trigger being created in the same migration of as the table it refers to, things break down at the

step b/c the table doesn't exist

As a workaround the migration has to be broken up into two, first with all entities and second with only triggers attached to the tables

yes, this is currently the best workaround

sambrilleman commented 2 years ago

I have a similar issue to the above, but related to a PGView referencing a newly added column in an sqlalchemy table, and both trying to be handled in the same migration. The main part of the traceback in my case being:

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column "my_new_column" does not exist

As a workaround the migration has to be broken up into two, first with all entities and second with only triggers attached to the tables

yes, this is currently the best workaround

Re this suggestion ☝️ - can you recommend a way to do this in a single alembic revision --autogenerate ? That is, is there a way I can still have the registered entities, but then set up part of my env.py file so that each "auto-generated" revision runs in a two-step process... first ignoring the registered entities, then a second time not ignoring the registered entities? (or perhaps this doesn't make sense, if for e.g. alembic upgrade needs to be run in between...?)

olirice commented 2 years ago

first ignoring the registered entities, then a second time not ignoring the registered entities? (or perhaps this doesn't make sense, if for e.g. alembic upgrade needs to be run in between...?)

alembic requires the database to be up-to-date (no unapplied migrations) in order to autogenerate a new migration. If it weren't for that restriction, producing two would be a great option

can you recommend a way to do this in a single alembic revision --autogenerate

unfortunately, it isn't currently possible. The workaround would be to

Andrei-Pozolotin commented 1 year ago

perhaps it would be easier to switch this project to pglast: a native postgres ast parser (rather then rendering with live postgres server) that would also remove multiple hacks used with current py.parse template engine

https://github.com/lelit/pglast

https://github.com/pganalyze/libpg_query

oc-ben-ellis commented 1 year ago

The difference is, alembic inspects the database and diffs the sqla models with the db schema in python while alembic_utils

@olirice Is there a reason why alembic_utils couldn't take the same approach? Is it just easier this way?

olirice commented 1 year ago

Tables, indexes, and (some) constraints are highly structured. That makes it possible to look at local definitions in python, and check in the database to see if they exist. They also can be broken down into a clear dependency tree by following foreign keys. Thats how alembic works

Functions, triggers, and views allows arbitrary SQL in their definitions. Much less structured. Postgres stores them internally as text, but that text isn't exactly what you provided. It parses and reformats the text so your local text blob does not match the text blog stored in e.g. the database's definition of a function.

The way alembic_utils checks to see if a functions definition has changed is to look at a function's current definition in the database, apply the local definition (in a transaction), check if the definition in the db changed, and then roll back the transaction.

Functions can also have references to other functions or tables in arbitrary and potentially dynamic SQL. More than one function can change at a time, so we have to try to solve the resolution order that allows all of the local declarative definitions to be applied in a single step.

Creating functions, views and triggers is fast and always uses few resources. For that reason, we can very quickly try to solve the resolution order inside transactions and ultimately roll them back while you wait for the migration to generate.

In contrast, the commands that alembic issues, might be extremely slow. For example, adding a new column with a default on a large table, creating an index, etc.

If we simulate the alembic events in the transaction the same way the database could lock up

Technically users shouldn't be autogenerating migrations while pointing at their production database, but it's clear that a large portion do from reading issues in alembic/alembic_utils

I'd be open to (and would love to use) a contribution that made it configurable to execute the alembic ops prior to running alembic_utils' diffing as long as it defaulted to off, but unfortunately its a larger feature than I currently have time to tackle

oc-ben-ellis commented 1 year ago

@olirice Thanks for the explanation :)

firdavsik commented 5 months ago

image Call register_entities at the bottom of env.py (after _runmigrations.....)

It is enough 😃

guyarad commented 5 months ago

@firdavsik

Call register_entities at the bottom of env.py (after _runmigrations.....)

This doesn't work for me. It simply didn't consider the registered entities during the migration at all - so it succeeded, but it didn't add the alembic-utils-entities.

Can you share more about your process/configuration?

tulinev commented 4 weeks ago

Another workaround. Idea is to apply all previous migration_ops before create trigger_operations. Added some code below comments 'Triggers in same migration'.

But patching alembic_utils it's another challenge. So i just copied source to my project and changed it.

Hope this will help to implement it in alembic_utils.

Change compare_registered_entities function in file alembic_utils/replaceable_entity.py

@comparators.dispatch_for("schema")
def compare_registered_entities(
    autogen_context: AutogenContext,
    upgrade_ops,
    schemas: List[Optional[str]],
):
    connection = autogen_context.connection

    entities = registry.entities()

    # Triggers in same migration
    # Stores original ops. So we could apply only this without new ops, what will be added inside this method
    original_upgrade_ops = list(upgrade_ops.ops)

    # https://alembic.sqlalchemy.org/en/latest/autogenerate.html#controlling-what-to-be-autogenerated
    # if EnvironmentContext.configure.include_schemas is True, all non-default "scehmas" should be included
    # pulled from the inspector
    include_schemas: bool = autogen_context.opts["include_schemas"]

    reflected_schemas = set(
        autogen_context.inspector.get_schema_names() if include_schemas else []  # type: ignore
    )
    sqla_schemas: Set[Optional[str]] = set(schemas)
    manual_schemas = set(registry.schemas or set())  # Deprecated for remove in 0.6.0
    entity_schemas = {x.schema for x in entities}  # from ReplaceableEntity instances
    all_schema_references = reflected_schemas | sqla_schemas | manual_schemas | entity_schemas  # type: ignore

    # Remove excluded schemas
    observed_schemas: Set[str] = {
        schema_name
        for schema_name in all_schema_references
        if (
            schema_name
            is not None
            not in (
                registry.exclude_schemas or set()
            )  # user defined. Deprecated for remove in 0.6.0
            and schema_name not in {"information_schema", None}
        )
    }

    # Solve resolution order
    transaction = connection.begin_nested()
    sess = Session(bind=connection)
    try:
        ordered_entities: List[ReplaceableEntity] = solve_resolution_order(sess, entities)
    finally:
        sess.rollback()

    # entities that are receiving a create or update op
    has_create_or_update_op: List[ReplaceableEntity] = []

    # database rendered definitions for the entities we have a local instance for
    # Note: used for drops
    local_entities = []

    # Required migration OPs, Create/Update/NoOp
    for entity in ordered_entities:
        logger.info(
            "Detecting required migration op %s %s",
            entity.__class__.__name__,
            entity.identity,
        )

        if entity.__class__ not in registry.allowed_entity_types:
            continue

        if not include_entity(entity, autogen_context, reflected=False):
            logger.debug(
                "Ignoring local entity %s %s due to AutogenContext filters",
                entity.__class__.__name__,
                entity.identity,
            )
            continue

        transaction = connection.begin_nested()
        sess = Session(bind=connection)
        try:

            # Triggers in same migration
            # Try to exe all previous operations before create ReplaceableObjects
            # ###################################################################
            from alembic.operations import Operations
            from alembic.migration import MigrationContext
            from collections import deque
            migration_ctx = MigrationContext.configure(connection)
            operations = Operations(migration_ctx)
            queue = deque(original_upgrade_ops)
            op_transaction = connection.begin_nested()
            while queue:
                elem = queue.popleft()
                if hasattr(elem, "ops"):
                    queue.extendleft(reversed(elem.ops))
                else:
                    operations.invoke(elem)

            op_transaction.commit()
            # ###################################################################

            maybe_op = entity.get_required_migration_op(sess, dependencies=has_create_or_update_op)

            local_db_def = entity.get_database_definition(
                sess, dependencies=has_create_or_update_op
            )
            local_entities.append(local_db_def)

            if maybe_op:
                upgrade_ops.ops.append(maybe_op)
                has_create_or_update_op.append(entity)

                logger.info(
                    "Detected %s op for %s %s",
                    maybe_op.__class__.__name__,
                    entity.__class__.__name__,
                    entity.identity,
                )
            else:
                logger.debug(
                    "Detected NoOp op for %s %s",
                    entity.__class__.__name__,
                    entity.identity,
                )

        finally:
            # Triggers in same migration
            # sess.rollback() - somehow doesn't rollback previous ops. transaction.rollback - does it.
            transaction.rollback()
            #sess.rollback()

    # Required migration OPs, Drop
    # Start a parent transaction
    # Bind the session within the parent transaction
    transaction = connection.begin_nested()
    sess = Session(bind=connection)
    try:
        # All database entities currently live
        # Check if anything needs to drop
        subclasses = collect_subclasses(alembic_utils, ReplaceableEntity)
        for entity_class in subclasses:

            if entity_class not in registry.allowed_entity_types:
                continue

            # Entities within the schemas that are live
            for schema in observed_schemas:

                db_entities: List[ReplaceableEntity] = entity_class.from_database(
                    sess, schema=schema
                )

                # Check for functions that were deleted locally
                for db_entity in db_entities:

                    if not include_entity(db_entity, autogen_context, reflected=True):
                        logger.debug(
                            "Ignoring remote entity %s %s due to AutogenContext filters",
                            db_entity.__class__.__name__,
                            db_entity.identity,
                        )
                        continue

                    for local_entity in local_entities:
                        if db_entity.identity == local_entity.identity:
                            break
                    else:
                        # No match was found locally
                        # If the entity passes the filters,
                        # we should create a DropOp
                        upgrade_ops.ops.append(DropOp(db_entity))
                        logger.info(
                            "Detected DropOp op for %s %s",
                            db_entity.__class__.__name__,
                            db_entity.identity,
                        )

    finally:
        sess.rollback()
olirice commented 4 weeks ago

I didn't test the code but the idea sounds good

One situation where that can fall down is if someone diffs against a database that has some data in it and your migrations include slow processes like creating indexes or data transformations