sqlalchemy / alembic

A database migrations tool for SQLAlchemy.
MIT License
2.78k stars 241 forks source link

Q: custom driver level execute()? #638

Open Andrei-Pozolotin opened 4 years ago

Andrei-Pozolotin commented 4 years ago
  1. in a scenario when using multi-master cluster with pglogical https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/

  2. ddl should be issued on a single node and then replicated by pglogical to all other nodes: https://www.2ndquadrant.com/en/resources/pglogical/pglogical-docs/#usage-additional-functions

        SELECT pglogical.replicate_ddl_command(
            command := '{command}',
            replication_sets := ARRAY['default']
        );
  3. by looking at the docs and sources: https://alembic.sqlalchemy.org/en/latest/search.html?q=execute https://github.com/sqlalchemy/alembic/search?l=Python&q=execute there seems to be no clear way to inject custom driver level execute(command)?

zzzeek commented 4 years ago

hi there -

you can invoke any textual SQL desired using either op.execute():

def upgrade():
    op.execute("arbitrary SQL commands")

or use the Connection directly, particularly if you need to do cursor -level commands:

def upgrade():
    connection = op.get_bind()
    connection.execute("arbitrary SQL commands")

docs for both of these concepts at https://alembic.sqlalchemy.org/en/latest/ops.html#alembic.operations.Operations.execute

hope this helps!

Andrei-Pozolotin commented 4 years ago
  1. thank you Mike.

  2. sorry I was not clear: the command above is not "arbitrary", it is in fact ddl sql expression generated from either upgrade() or downgrade()

  3. in other words, I need to intercept ddl on the level of alembic/env.py

and replace each internal alembic invocation of:

connection.execute("ddl from upgrade()")

with a wrapper in the form:

connection.execute("""
        SELECT pglogical.replicate_ddl_command(
            command := 'ddl from upgrade()'
        );
""")
zzzeek commented 4 years ago
1. thank you Mike.

2. sorry I was not clear: the `command` above is not "arbitrary",
   it is in fact ddl sql expression generated from either `upgrade()` or `downgrade()`

3. in other words, I need to intercept ddl on the level of `alembic/env.py`

and replace each internal alembic invocation of:

connection.execute("ddl from upgrade()")

with a wrapper in the form:

connection.execute("""
        SELECT pglogical.replicate_ddl_command(
            command := 'ddl from upgrade()'
        );
""")

OK, it seemed like you were referring to something like that but it was not clear.

to intercept SQL you use SQLAlchemy's connection events in conjunction with the connection you establish in your env.py:

https://docs.sqlalchemy.org/en/13/core/events.html#sql-execution-and-connection-events

so it would be like in env.py:

with connectable.connect() as connection:
    @event.listens_for(connection, "before_cursor_execute", retval=True)
    def add_special_ddl(conn, cursor, statement, parameters, context,
                                            executemany):

        # use preferred string processing style here to modify the statement
        if "ddl from upgrade" in statement:
              statement = """SELECT pglogical etc etc """

        return statement, parameters

    context.configure(
        compare_type=True,
        connection=connection, target_metadata=target_metadata
    )

    with context.begin_transaction():
        context.run_migrations()
Andrei-Pozolotin commented 4 years ago
  1. @event.listens_for works, thank you.
  2. perhaps worth adding to https://alembic.sqlalchemy.org/en/latest/cookbook.html