mara / mara-pipelines

A lightweight opinionated ETL framework, halfway between plain scripts and Apache Airflow
MIT License
2.07k stars 100 forks source link

Use run_id as the primary key #29

Closed ierosodin closed 4 years ago

ierosodin commented 4 years ago

Avoid duplicated key in data_integration_system_statistics table

martin-loetzsch commented 4 years ago

Thanks!

I think the run_id needs to be nullable though, otherwise updating an existing project would require a manual migration, which I'd like to avoid:

make migrate-mara-db 
migrate-mara-db: FLASK_APP=app/app.py .venv/bin/flask mara_db.migrate
migrate-mara-db: ALTER TABLE data_integration_system_statistics ADD COLUMN run_id TIMESTAMP WITH TIME ZONE NOT NULL;
migrate-mara-db: 
migrate-mara-db: Traceback (most recent call last):
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1245, in _execute_context
migrate-mara-db:     self.dialect.do_execute(
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
migrate-mara-db:     cursor.execute(statement, parameters)
migrate-mara-db: psycopg2.errors.NotNullViolation: column "run_id" contains null values
migrate-mara-db: 
migrate-mara-db: 
migrate-mara-db: The above exception was the direct cause of the following exception:
migrate-mara-db: 
migrate-mara-db: Traceback (most recent call last):
migrate-mara-db:   File ".venv/bin/flask", line 10, in <module>
migrate-mara-db:     sys.exit(main())
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/flask/cli.py", line 966, in main
migrate-mara-db:     cli.main(prog_name="python -m flask" if as_module else None)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/flask/cli.py", line 586, in main
migrate-mara-db:     return super(FlaskGroup, self).main(*args, **kwargs)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/click/core.py", line 717, in main
migrate-mara-db:     rv = self.invoke(ctx)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/click/core.py", line 1137, in invoke
migrate-mara-db:     return _process_result(sub_ctx.command.invoke(sub_ctx))
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/click/core.py", line 956, in invoke
migrate-mara-db:     return ctx.invoke(self.callback, **ctx.params)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/click/core.py", line 555, in invoke
migrate-mara-db:     return callback(*args, **kwargs)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/packages/mara-db/mara_db/cli.py", line 13, in migrate
migrate-mara-db:     if not mara_db.auto_migration.auto_discover_models_and_migrate():
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/packages/mara-db/mara_db/auto_migration.py", line 125, in auto_discover_models_and_migrate
migrate-mara-db:     return auto_migrate(engine('mara'), models)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/packages/mara-db/mara_db/auto_migration.py", line 96, in auto_migrate
migrate-mara-db:     connection.execute(statement)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 976, in execute
migrate-mara-db:     return self._execute_text(object_, multiparams, params)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1143, in _execute_text
migrate-mara-db:     ret = self._execute_context(
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1249, in _execute_context
migrate-mara-db:     self._handle_dbapi_exception(
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
migrate-mara-db:     util.raise_from_cause(sqlalchemy_exception, exc_info)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
migrate-mara-db:     reraise(type(exception), exception, tb=exc_tb, cause=cause)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
migrate-mara-db:     raise value.with_traceback(tb)
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1245, in _execute_context
migrate-mara-db:     self.dialect.do_execute(
migrate-mara-db:   File "/Users/mloetzsch/Projects/xxx/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 581, in do_execute
migrate-mara-db:     cursor.execute(statement, parameters)
migrate-mara-db: sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) column "run_id" contains null values
migrate-mara-db: 
migrate-mara-db: [SQL: ALTER TABLE data_integration_system_statistics ADD COLUMN run_id TIMESTAMP WITH TIME ZONE NOT NULL;
migrate-mara-db: 
migrate-mara-db: ]
migrate-mara-db: (Background on this error at: http://sqlalche.me/e/gkpj)
migrate-mara-db: 2 seconds
make: *** [migrate-mara-db] Error 1
ierosodin commented 4 years ago

Hi @martin-loetzsch , Technically, primary_key are both unique and not null. I'm not sure if setting it as nullable can let it automatically generate unique run_id. (Or if setting it as autoincrement can solve this problem?)

martin-loetzsch commented 4 years ago

When I'm migrating, I still get this:

sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) column "run_id" contains null values

I think you can't use a column with null values as a primary key. Is there a way to have defaults?

ghost commented 4 years ago

@ierosodin I think there are only two options for this:

  1. go with the current solution but add a migration script setting all the nullable values to a value
  2. use a unique index constraint, not a primary key --> then nullable values are supported, and there can be multiple nullable values in a unique index constraint
ierosodin commented 4 years ago

@hz-lschick I prefer the second option which is easier to implement and changeless for the existing projects. But I'm wondering if "multiple unique columns" means the combination of these two columns is unique. Also, the primary key is still repeated.

ghost commented 4 years ago

@ierosodin right, the combination of the given columns must be unique unless it is a null-value; then no check is performend. I'd recommend to take a look at "The Test" of this article or creating a testing table and testing around with it to understand its functionality better.

martin-loetzsch commented 4 years ago

I'm also for option 2. In a few months. By default, run logs are kept for 30 days in the mara db. So in half a year we can go for option 1.

@ierosodin can you make a commit that does that?

ierosodin commented 4 years ago

I'm trying to do option 2; however, there is still a problem which is the column of the primary key can neither null nor duplicated. Also, for sqlalchemy orm, there must exist a primary key, which cannot only set the run_id to be unique and no primary key. But I can set neither timestamp nor run_id column to be the primary key. (Or I misunderstood the option2?)

(I also find a method to circumvent the primary key constraint in sqlalchemy: https://stackoverflow.com/questions/34283259/how-to-define-a-table-without-primary-key-with-sqlalchemy But I don't have env to test this.)

Another workaround is we can first set timestamp to be the primary key. And after half a year, we can then transfer to run_id. (Bad news is it cannot fully support multi-writer in the short term)

jankatins commented 4 years ago

If we make a quick release which just adds the run_id without changing the primary key, this would "start" the waiting period. Then you can can add a PR for changing the Primary Key (+ a small script which sets some default run_id?) in a branch and run this and we can merge it in a a few month when no-one has old data around with a warning that if someone overwrote the default throw-away period, they need to run the script.

ierosodin commented 4 years ago

This is the simple script I used to fill in the null value of run_id column in data_integration_system_statistics table:

import mara_db.postgresql
import mara_db.config
import mara_db.dbs

mara_db.config.databases \
    = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

with mara_db.postgresql.postgres_cursor_context('mara') as cursor:
    cursor.execute(f'''
UPDATE data_integration_system_statistics
SET run_id = -1
WHERE run_id IS NULL''')
jankatins commented 4 years ago

I just tried this, which sets a DEFAULT value on the server

    timestamp = sqlalchemy.Column(sqlalchemy.TIMESTAMP(timezone=True), primary_key=True, index=True)
    # server_default needs to be here to support the migration to -1 for old runs
    run_id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False,
                               server_default=sqlalchemy.text('-1'))

Results in this migration:

ALTER TABLE data_integration_system_statistics ADD COLUMN run_id INTEGER DEFAULT -1 NOT NULL;

-> This successfully adds the new column as non-nullable but unfortunately, it doesn't reset the primary key. Seems we are not the only one having this problem: https://stackoverflow.com/a/29034858/1380673

So my suggestion would be this:

jankatins commented 4 years ago

I'll push the required changes for that to the PR branch...

martin-loetzsch commented 4 years ago

ok, whatever doesn't require manual migrations

jankatins commented 4 years ago

Have a look at https://github.com/mara/data-integration/pull/38 which includes the proposed change. Sorry for opening a new PR, this was easier than trying to get this included in the original :-/

ierosodin commented 4 years ago

Look good to me.

martin-loetzsch commented 4 years ago

Fixed in #38