kvesteri / postgresql-audit

Audit trigger for PostgreSQL
BSD 2-Clause "Simplified" License
126 stars 28 forks source link

Using p-a with migrations #20

Open antoine-lizee opened 7 years ago

antoine-lizee commented 7 years ago

If I understand well, the table_listeners that put the tooling in place as well as the individual audit_table listeners that enable auditing for a single table are fired on an 'after_create' bound to a table instance.

This seems to work only when directly using the mapper constructors like create_all(), because it uses the objects as defined, whereas using alembic migrations will lose track of the objects to which the listener is bound when creating the tables, preventing it to be ever fired. (see http://stackoverflow.com/a/16907683/1877609)

It would be amazing to have some kind of interface to either (i) easily generate migrations that would basically do the job of the listeners (ii) fire the listeners' content at will.

[This issue is related to #7]

Any thought on that?

antoine-lizee commented 7 years ago

For you information, I ended up massively monkey-patching the flask version of the VersioningManager to issue the actual tooling & auditing statements on 'after_configured' instead of binding them to 'after_create' table listeners that never got fired. Here is how it goes:

import sqlalchemy as sa
from postgresql_audit import ImproperlyConfigured
from postgresql_audit.base import array, StatementExecutor
from postgresql_audit.flask import VersioningManager as BaseVersioningManager, context_available
from sqlalchemy.exc import ProgrammingError

from app import db

class MigratedVersioningManager(BaseVersioningManager):

    def audit_table(self, table, exclude_columns=None):
        """ Issues SQL to audit a table.

        This is a near-complete copy of the corresponding super() function, but actually firing the events instead of
        binding them to uncalled 'after_create' listeners.
        This function is itself called when the 'after_configured' is fired, calling `configure_versioned_classes()` as
        per the `postgresql_audit.VersionManager.listeners`
        """
        args = [table.name]
        if exclude_columns:
            for column in exclude_columns:
                if column not in table.c:
                    raise ImproperlyConfigured(
                        "Could not configure versioning. Table '{}'' does "
                        "not have a column named '{}'.".format(
                            table.name, column
                        )
                    )
            args.append(array(exclude_columns))

        if self.schema_name is None:
            func = sa.func.audit_table
        else:
            func = getattr(getattr(sa.func, self.schema_name), 'audit_table')
        query = sa.select([func(*args)])
        executor = StatementExecutor(query)
        self.fire_event(executor)

    def fire_event(self, event):
        with db.engine.begin() as connection:
            if isinstance(event, sa.schema.DDL):
                connection.execute(event)
            else:
                event('dummy_table_argument', connection)

    def fire_table_listeners(self):
        table_listener_events = [l[1] for l in self.table_listeners['activity']] + \
                                 [l[1] for l in self.table_listeners['transaction']]
        for event in table_listener_events:
            self.fire_event(event)

    def configure_versioned_classes(self):
        """ Fire the tooling building requests + configures the audited classes

        This is bound to the 'after_configured' listener.
        """
        try:
            self.fire_table_listeners()
            super().configure_versioned_classes()
        except ProgrammingError:
            print("Aborting auditing setup because tables are inexistent yet. Try again after migrating the database to"
                  "create the missing tables.")

    def attach_listeners(self):
        """ Monkey-patched from super() to not attache the table listeners.

        """
        for listener in self.listeners:
            sa.event.listen(*listener)
karl-dv commented 6 years ago

I've been struggling with the same problem. And I found a better solution, I think.

The next flask db migrate doesn't detect any changes. And when I edit a record through the application, a transaction/activity trail is created. So it all seems good so far...

Here's the complete listing of the migration script. And as I said, I only added the two for-loops.

"""
Add 'postgresql-audit' support

Revision ID: 4cac610a6d91
Revises: 185801ec6c0e
Create Date: 2018-04-26 12:40:15.979353

"""
from alembic import op
import sqlalchemy as sa
from postgresql_audit.base import versioning_manager, cached_statements
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '4cac610a6d91'
down_revision = '185801ec6c0e'
branch_labels = None
depends_on = None

def upgrade():
    # start of "generated by alembic"
    op.create_table('transaction',
        sa.Column('id', sa.BigInteger(), nullable=False),
        sa.Column('native_transaction_id', sa.BigInteger(), nullable=True),
        sa.Column('issued_at', sa.DateTime(), nullable=True),
        sa.Column('client_addr', postgresql.INET(), nullable=True),
        sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_transaction_native_transaction_id'), 'transaction', ['native_transaction_id'], unique=False)

    op.create_table('activity',
        sa.Column('id', sa.BigInteger(), nullable=False),
        sa.Column('schema_name', sa.Text(), nullable=True),
        sa.Column('table_name', sa.Text(), nullable=True),
        sa.Column('relid', sa.Integer(), nullable=True),
        sa.Column('issued_at', sa.DateTime(), nullable=True),
        sa.Column('native_transaction_id', sa.BigInteger(), nullable=True),
        sa.Column('verb', sa.Text(), nullable=True),
        sa.Column('old_data', postgresql.JSONB(astext_type=sa.Text()), server_default='{}', nullable=True),
        sa.Column('changed_data', postgresql.JSONB(astext_type=sa.Text()), server_default='{}', nullable=True),
        sa.Column('transaction_id', sa.BigInteger(), nullable=True),
        sa.ForeignKeyConstraint(['transaction_id'], ['transaction.id'], ),
        sa.PrimaryKeyConstraint('id')
    )
    # end of "generated by alembic"

    # add 'postgresql-audit' functions and operators
    for table in versioning_manager.table_listeners:
        for trig, event in versioning_manager.table_listeners[table]:
            if isinstance(event, sa.schema.DDL):
                op.execute(event)
            else:
                event('dummy_table_argument', op.get_bind())

    # add __versioned__ table triggers
    versioning_manager.configure_versioned_classes()
    for query in cached_statements:
        event = cached_statements[query]
        if isinstance(event, sa.schema.DDL):
            op.execute(event)
        else:
            event('dummy_table_argument', op.get_bind())

    # or else, an explicit add is possible too, better reflecting the DROP in downgrade()
    ### op.execute("SELECT audit_table('{}');".format('user_def'))

def downgrade():
    # start of "generated by alembic"
    op.execute('DROP TRIGGER audit_trigger_row ON user_def')

    op.drop_table('activity')

    op.drop_index(op.f('ix_transaction_native_transaction_id'), table_name='transaction')
    op.drop_table('transaction')
    # end of "generated by alembic"
antoine-lizee commented 6 years ago

Thanks for sharing your solution @karl-dv . Mine was clearly a "hack" to do the set up at each app start. The goal was to have a blanket-check that would put in place any trigger that would be required.

Your solution tries to solve the problem by doing the versioning setup in the migration. It's cleaner, but seems to not address the case of adding new versioned tables in the future (after the Activity and Transaction models are created & the tables are migrated).

What I would do if I had the time:

People from Alan might have some time to work on this since we quite heavily rely on pg-a in prod right now :-)

zwerg44 commented 5 years ago

I am struggling with the same things now.

It's also sad, that we don't have any rollback code to cleanup chages made by pg-a during the setup. My current alembic downdrade for 0.10.0 is here:

def downgrade():
    drop_triggers = '''
        DROP TRIGGER IF EXISTS audit_trigger_row ON public.%(table_name)s;
        DROP TRIGGER IF EXISTS audit_trigger_insert ON public.%(table_name)s;
        DROP TRIGGER IF EXISTS audit_trigger_update ON public.%(table_name)s;
        DROP TRIGGER IF EXISTS audit_trigger_delete ON public.%(table_name)s;
    '''
    for params in VERSIONED:
        op.execute(drop_triggers % params)

    drop_functions = '''
        DROP FUNCTION IF EXISTS public.audit_table(regclass);
        DROP FUNCTION IF EXISTS public.audit_table(regclass, text[]);
        DROP FUNCTION IF EXISTS public.jsonb_change_key_name(jsonb, text, text);
        DROP FUNCTION IF EXISTS public.create_activity();
        DROP FUNCTION IF EXISTS public.jsonb_subtract(jsonb, jsonb) CASCADE;
        DROP FUNCTION IF EXISTS public.jsonb_subtract(jsonb, text[]) CASCADE;
        DROP FUNCTION IF EXISTS public.jsonb_subtract(jsonb, text) CASCADE;
        DROP FUNCTION IF EXISTS public.jsonb_merge(jsonb, jsonb) CASCADE;

        DROP OPERATOR IF EXISTS public.- (jsonb, jsonb);
        DROP OPERATOR IF EXISTS public.- (jsonb, text[]);
        DROP OPERATOR IF EXISTS public.- (jsonb, text)
    '''
    op.execute(drop_functions)
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('activity')
    op.drop_index(op.f('ix_transaction_native_transaction_id'), table_name='transaction')
    op.drop_table('transaction')
    # ### end Alembic commands ###