ckan / ckanext-xloader

Express Loader - quickly load data into DataStore. A replacement for DataPusher.
GNU Affero General Public License v3.0
43 stars 49 forks source link

xloader assumes the connection to sql server is live even after a long time #218

Open aimalkhan opened 1 month ago

aimalkhan commented 1 month ago

We are running into an issue with DB connection when processing larger files. It seems like xloader assumes that the db connection will be open for the duration of the file being processed, which is not always the case. In the example below the 360MB file takes ~20mins in the column indexes created stage, which is usually too long for usual DB server idle-timeout settings. The quick fix is to increase the server's idle-timeout, but that does not scale well. Are there any other suggestions that we can use, perhaps check for sql connection at each step and re-establish the connection if it does not exist?

xloader log:

2024-05-30 15:30:11,822 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Express Load starting: /dataset/confirmed-positive-cases-of-covid-19-in-ontario/resource/455fd63b-603d-4608-8216-7d8647f43350
Fetching from: https://data.ontario.ca/dataset/f4112442-bdc8-45d2-be3c-12efae72fb27/resource/455fd63b-603d-4608-8216-7d8647f43350/download/conposcovidloc.csv
2024-05-30 15:30:11,841 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Fetching from: https://data.ontario.ca/dataset/f4112442-bdc8-45d2-be3c-12efae72fb27/resource/455fd63b-603d-4608-8216-7d8647f43350/download/conposcovidloc.csv
Downloaded ok - 360.0 MB
2024-05-30 15:30:18,752 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Downloaded ok - 360.0 MB
File hash: e261d83a56969d10af3edae72695ea22
2024-05-30 15:30:18,769 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] File hash: e261d83a56969d10af3edae72695ea22
Loading CSV
2024-05-30 15:30:18,786 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Loading CSV
'use_type_guessing' mode is: False
2024-05-30 15:30:18,803 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] 'use_type_guessing' mode is: False
Ensuring character coding is UTF8
2024-05-30 15:30:18,832 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Ensuring character coding is UTF8
Deleting "455fd63b-603d-4608-8216-7d8647f43350" from DataStore.
2024-05-30 15:30:45,300 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Deleting "455fd63b-603d-4608-8216-7d8647f43350" from DataStore.
2024-05-30 15:30:45,738 DEBUG [ckanext.datastore.logic.action] Setting datastore_active=False on resource 455fd63b-603d-4608-8216-7d8647f43350
Fields: [{'id': 'Row_ID', 'type': 'text'}, {'id': 'Accurate_Episode_Date', 'type': 'text'}, {'id': 'Case_Reported_Date', 'type': 'text'}, {'id': 'Test_Reported_Date', 'type': 'text'}, {'id': 'Specimen_Date', 'type': 'text'}, {'id': 'Age_Group', 'type': 'text'}, {'id': 'Client_Gender', 'type': 'text'}, {'id': 'Outcome1', 'type': 'text'}, {'id': 'Reporting_PHU_ID', 'type': 'text'}, {'id': 'Reporting_PHU', 'type': 'text'}, {'id': 'Reporting_PHU_Address', 'type': 'text'}, {'id': 'Reporting_PHU_City', 'type': 'text'}, {'id': 'Reporting_PHU_Postal_Code', 'type': 'text'}, {'id': 'Reporting_PHU_Website', 'type': 'text'}, {'id': 'Reporting_PHU_Latitude', 'type': 'text'}, {'id': 'Reporting_PHU_Longitude', 'type': 'text'}]
2024-05-30 15:30:46,079 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Fields: [{'id': 'Row_ID', 'type': 'text'}, {'id': 'Accurate_Episode_Date', 'type': 'text'}, {'id': 'Case_Reported_Date', 'type': 'text'}, {'id': 'Test_Reported_Date', 'type': 'text'}, {'id': 'Specimen_Date', 'type': 'text'}, {'id': 'Age_Group', 'type': 'text'}, {'id': 'Client_Gender', 'type': 'text'}, {'id': 'Outcome1', 'type': 'text'}, {'id': 'Reporting_PHU_ID', 'type': 'text'}, {'id': 'Reporting_PHU', 'type': 'text'}, {'id': 'Reporting_PHU_Address', 'type': 'text'}, {'id': 'Reporting_PHU_City', 'type': 'text'}, {'id': 'Reporting_PHU_Postal_Code', 'type': 'text'}, {'id': 'Reporting_PHU_Website', 'type': 'text'}, {'id': 'Reporting_PHU_Latitude', 'type': 'text'}, {'id': 'Reporting_PHU_Longitude', 'type': 'text'}]
2024-05-30 15:30:46,329 DEBUG [ckanext.datastore.logic.action] Setting datastore_active=True on resource 455fd63b-603d-4608-8216-7d8647f43350
Copying to database...
2024-05-30 15:30:46,989 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Copying to database...
...copying done
2024-05-30 15:31:03,190 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] ...copying done
Creating search index...
2024-05-30 15:31:03,207 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Creating search index...
...search index created
2024-05-30 15:32:44,205 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] ...search index created
Calculating record count (running ANALYZE on the table)
2024-05-30 15:32:44,222 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Calculating record count (running ANALYZE on the table)
Setting resource.datastore_active = True
2024-05-30 15:32:47,851 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Setting resource.datastore_active = True
Setting resource.datastore_contains_all_records_of_source_file = True
2024-05-30 15:32:47,867 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Setting resource.datastore_contains_all_records_of_source_file = True
Data now available to users: /dataset/confirmed-positive-cases-of-covid-19-in-ontario/resource/455fd63b-603d-4608-8216-7d8647f43350
2024-05-30 15:32:48,359 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Data now available to users: /dataset/confirmed-positive-cases-of-covid-19-in-ontario/resource/455fd63b-603d-4608-8216-7d8647f43350
**Creating column indexes (a speed optimization for queries)...
2024-05-30 15:32:48,375 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] Creating column indexes (a speed optimization for queries)...
...column indexes created.
2024-05-30 15:52:08,289 INFO  [cacc8af6-b467-453a-b6f0-b4f202de7ce8] ...column indexes created.
2024-05-30 15:52:08,334 ERROR [ckanext.xloader.jobs] xloader error: (psycopg2.errors.IdleInTransactionSessionTimeout) terminating connection due to idle-in-transaction timeout
SSL connection has been closed unexpectedly**

[SQL: SELECT "user".password AS user_password, "user".id AS user_id, "user".name AS user_name, "user".fullname AS user_fullname, "user".email AS user_email, "user".apikey AS user_apikey, "user".created AS user_created, "user".reset_key AS user_reset_key, "user".about AS user_about, "user".activity_streams_email_notifications AS user_activity_streams_email_notifications, "user".sysadmin AS user_sysadmin, "user".state AS user_state, "user".image_url AS user_image_url, "user".plugin_extras AS user_plugin_extras
FROM "user"
WHERE "user".name = %(name_1)s OR "user".id = %(id_1)s ORDER BY "user".name
 LIMIT %(param_1)s]
[parameters: {'name_1': 'default', 'id_1': 'default', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/2j85), Traceback (most recent call last):
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1243, in _execute_context
    self.dialect.do_execute(
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 550, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.IdleInTransactionSessionTimeout: terminating connection due to idle-in-transaction timeout
SSL connection has been closed unexpectedly

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/ckan/default/src/ckanext-xloader/ckanext/xloader/jobs.py", line 76, in xloader_data_into_datastore
    xloader_data_into_datastore_(input, job_dict)
  File "/usr/lib/ckan/default/src/ckanext-xloader/ckanext/xloader/jobs.py", line 219, in xloader_data_into_datastore_
    direct_load()
  File "/usr/lib/ckan/default/src/ckanext-xloader/ckanext/xloader/jobs.py", line 184, in direct_load
    update_resource(resource={'id': resource['id'], 'hash': resource['hash']},
  File "/usr/lib/ckan/default/src/ckanext-xloader/ckanext/xloader/jobs.py", line 444, in update_resource
    user = get_action('get_site_user')({'ignore_auth': True}, {})
  File "/usr/lib/ckan/default/src/ckan/ckan/logic/__init__.py", line 504, in wrapped
    result = _action(context, data_dict, **kw)
  File "/usr/lib/ckan/default/src/ckan/ckan/logic/action/get.py", line 2401, in get_site_user
    user = model.User.get(site_id)
  File "/usr/lib/ckan/default/src/ckan/ckan/model/user.py", line 66, in get
    return query.first()
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3222, in first
    ret = list(self[0:1])
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3012, in __getitem__
    return list(res)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3324, in __iter__
    return self._execute_and_instances(context)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3349, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1247, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1466, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 399, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value.with_traceback(tb)
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1243, in _execute_context
    self.dialect.do_execute(
  File "/usr/lib/ckan/default/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 550, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.InternalError: (psycopg2.errors.IdleInTransactionSessionTimeout) terminating connection due to idle-in-transaction timeout
SSL connection has been closed unexpectedly

[SQL: SELECT "user".password AS user_password, "user".id AS user_id, "user".name AS user_name, "user".fullname AS user_fullname, "user".email AS user_email, "user".apikey AS user_apikey, "user".created AS user_created, "user".reset_key AS user_reset_key, "user".about AS user_about, "user".activity_streams_email_notifications AS user_activity_streams_email_notifications, "user".sysadmin AS user_sysadmin, "user".state AS user_state, "user".image_url AS user_image_url, "user".plugin_extras AS user_plugin_extras
FROM "user"
WHERE "user".name = %(name_1)s OR "user".id = %(id_1)s ORDER BY "user".name
 LIMIT %(param_1)s]
[parameters: {'name_1': 'default', 'id_1': 'default', 'param_1': 1}]
(Background on this error at: http://sqlalche.me/e/2j85)

2024-05-30 15:52:08,884 INFO  [ckan.lib.jobs] Worker rq:worker:7a6dc2f3c3144be5b8bdf475bcc07384 has finished job cacc8af6-b467-453a-b6f0-b4f202de7ce8 from queue "default"
ThrawnCA commented 1 month ago

NB 320MB is larger than CKAN allows by default. If you're going to adjust settings to allow it, you might need to also adjust your database accordingly, increasing the server capacity or lengthening timeouts.

aimalkhan commented 1 month ago

Yes, and adjusting db settings do work. The issue is that the processing time of files is not fixed, say, for when there are multiple files being processed. I was thinking a more flexible solution will be for xloader to check for connection and retry connection at each step it needs one. Is that something worth looking into for a contribution, or is this a special case that others users never encounter?

ThrawnCA commented 1 month ago

I was thinking a more flexible solution will be for xloader to check for connection and retry connection at each step it needs one.

Hmm. There is already a retry facility, but it is specific to locking errors. Do you want to check whether it could be expanded to include connection timeouts?

On the other hand, if a resource takes too long once, it will probably take too long the second time as well.