project-koku / masu

This is a READ ONLY repo. See https://github.com/project-koku/koku for current masu implementation
GNU Affero General Public License v3.0
5 stars 6 forks source link

Integrity Error as a result of download/processing race condition #92

Closed dccurtis closed 6 years ago

dccurtis commented 6 years ago

See #69 for more background on this

Backtrace of situation described in #69.

Setup to reproduce (As of 7/12/18)

  1. Remove filtering in AWSNotificationHandler
  2. Setup SNS as normal
  3. use nise to force S3 bucket changes and trigger SNS in rapid succession. You may get lucky and hit this
[2018-07-12 09:21:00,672: ERROR/ForkPoolWorker-4] Task masu.processor.tasks.process_report_file[168930b5-e1d2-47f4-b67f-655ba8ab78f5] raised unexpected: IntegrityError('(psycopg2.IntegrityError) duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"\nDETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.\n',)
Traceback (most recent call last):
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"
DETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/curtisd/projects/repos/masu_fork/masu/celery/__init__.py", line 56, in __call__
    return self.run(*args, **kwargs)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/tasks.py", line 90, in process_report_file
    _process_report_file(schema_name, report_path, compression)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/_tasks/process.py", line 59, in _process_report_file
    last_cursor_position = processor.process()
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/report_processor.py", line 130, in process
    product_id = self._create_cost_entry_product(row)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/report_processor.py", line 446, in _create_cost_entry_product
    self.report_db.flush_db_object(product)
  File "/Users/curtisd/projects/repos/masu_fork/masu/database/report_db_accessor.py", line 188, in flush_db_object
    self._session.flush()
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in flush
    self._flush(objects)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in _flush
    transaction.rollback(_capture_exception=True)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
    flush_context.execute()
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in execute
    rec.execute(self)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in execute
    uow
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
    mapper, table, insert)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 866, in _emit_insert_statements
    execute(statement, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"
DETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.
 [SQL: 'INSERT INTO testcustomer.reporting_awscostentryproduct (sku, product_name, product_family, service_code, region, instance_type, memory, vcpu, memory_unit) VALUES (%(sku)s, %(product_name)s, %(product_family)s, %(service_code)s, %(region)s, %(instance_type)s, %(memory)s, %(vcpu)s, %(memory_unit)s) RETURNING testcustomer.reporting_awscostentryproduct.id'] [parameters: {'sku': 'DTBHBIIXSMUZ', 'product_name': 'Amazon Elastic Compute Cloud', 'product_family': 'Compute Instance', 'service_code': 'AmazonEC2', 'region': None, 'instance_type': 'm5.large', 'memory': 8.0, 'vcpu': 2, 'memory_unit': 'GiB'}] (Background on this error at: http://sqlalche.me/e/gkpj)
[2018-07-12 09:21:00,715: ERROR/ForkPoolWorker-6] Task masu.processor.tasks.process_report_file[7e87671c-02e2-4cf9-931a-126a2e9166f3] raised unexpected: IntegrityError('(psycopg2.IntegrityError) duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"\nDETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.\n',)
Traceback (most recent call last):
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"
DETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/curtisd/projects/repos/masu_fork/masu/celery/__init__.py", line 56, in __call__
    return self.run(*args, **kwargs)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/tasks.py", line 90, in process_report_file
    _process_report_file(schema_name, report_path, compression)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/_tasks/process.py", line 59, in _process_report_file
    last_cursor_position = processor.process()
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/report_processor.py", line 130, in process
    product_id = self._create_cost_entry_product(row)
  File "/Users/curtisd/projects/repos/masu_fork/masu/processor/report_processor.py", line 446, in _create_cost_entry_product
    self.report_db.flush_db_object(product)
  File "/Users/curtisd/projects/repos/masu_fork/masu/database/report_db_accessor.py", line 188, in flush_db_object
    self._session.flush()
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in flush
    self._flush(objects)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in _flush
    transaction.rollback(_capture_exception=True)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
    flush_context.execute()
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in execute
    rec.execute(self)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in execute
    uow
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
    mapper, table, insert)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 866, in _emit_insert_statements
    execute(statement, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/Users/curtisd/py_env/koku/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "reporting_awscostentryproduct_sku_key"
DETAIL:  Key (sku)=(DTBHBIIXSMUZ) already exists.
 [SQL: 'INSERT INTO testcustomer.reporting_awscostentryproduct (sku, product_name, product_family, service_code, region, instance_type, memory, vcpu, memory_unit) VALUES (%(sku)s, %(product_name)s, %(product_family)s, %(service_code)s, %(region)s, %(instance_type)s, %(memory)s, %(vcpu)s, %(memory_unit)s) RETURNING testcustomer.reporting_awscostentryproduct.id'] [parameters: {'sku': 'DTBHBIIXSMUZ', 'product_name': 'Amazon Elastic Compute Cloud', 'product_family': 'Compute Instance', 'service_code': 'AmazonEC2', 'region': None, 'instance_type': 'm5.large', 'memory': 8.0, 'vcpu': 2, 'memory_unit': 'GiB'}] (Background on this error at: http://sqlalche.me/e/gkpj)
adberglund commented 6 years ago

Assuming product inserts are working as expected (there could be a bug there) then this is a good example of the race condition around processing multiple report files from the same source at the same time.

For every line item in a cost usage report, we pull the referenced product.

When a batch of line items have been processed we commit the products and all other relevant tables to the database.

The race condition being seen here could come about from the following scenario:

Task 1 starts processing a file. It sees new products that are not yet in the database and puts them on the session.

Task 2 starts processing a file. It sees the same new products that are not yet in the database and puts them on the session.

Task 1 commits to the database.

Task 2 attempts to commit to the database, but it tries to insert the products that were just inserted by task 1, and we get the integrity error in the traceback.

adberglund commented 6 years ago

This can likely be solved by using Postgres's ON CONFLICT clause when inserting.