BritishGeologicalSurvey / etlhelper

ETL Helper is a Python ETL library to simplify data transfer into and out of databases.
https://britishgeologicalsurvey.github.io/etlhelper/
GNU Lesser General Public License v3.0
104 stars 25 forks source link

Add on_error handling to executemany and related methods #115

Closed volcan01010 closed 2 years ago

volcan01010 commented 2 years ago

Description

This merge request add the on_error parameter to executemany. If a chunk raises an error then it will be retried as individual rows and the failing rows and their exceptions collected into a list. The on_error function is then called on the list.

This provides a flexible way for users to handle failing rows in "real" time, as any function can be used. The updated README.md has details.

Closes #91, which has much more discussion (and also #113, which is a bug-fix)

The changes also include making chunk_size a named parameter on many of the functions. This makes it more convenient to change, rather than having to modify a module-level constant.

To test

To experiment, use the script below to test out inserting data to a SQLite database. Try writing your own on_error functions.

"""Script to create database and load observations data from csv file

Generate observations.csv with:
curl 'https://sensors.bgs.ac.uk/FROST-Server/v1.1/Observations?$select=@iot.id,result,phenomenonTime&$top=20000&$resultFormat=csv' -o observations.csv
"""
import csv
import datetime as dt
import sqlite3
from typing import Iterable

from etlhelper import execute, load, DbParams

def load_observations(csv_file, conn):
    """Load observations from csv_file to db_file."""
    # Drop table (helps with repeated test runs!
    drop_table_sql = """
        DROP TABLE observations
        """
    execute(drop_table_sql, conn)

    # Create table (reject ids with no remainder when divided by 1000)  
    create_table_sql = """
        CREATE TABLE IF NOT EXISTS observations (
          id INTEGER PRIMARY KEY CHECK (id % 1000),
          time TIMESTAMP,
          result FLOAT
          )"""
    execute(create_table_sql, conn)

    # Load data
    with open(csv_file, 'rt') as f:
        reader = csv.DictReader(f)
        load('observations', conn, transform(reader), on_error=on_error)

def on_error(failed_rows):
    """Print the IDs of failed rows"""
    rows, exceptions = zip(*failed_rows)
    failed_ids = [row['id'] for row in rows]
    print(f"Failed IDs: {failed_ids}")

# A transform function that takes an iterable and yields one row at a time
# returns a "generator".  The generator is also iterable, and records are
# processed as they are read so the whole file is never held in memory.
def transform(rows: Iterable[dict]) -> Iterable[dict]:
    """Rename time column and convert to Python datetime."""
    for row in rows:
        row['time'] = row.pop('phenomenonTime')
        row['time'] = dt.datetime.strptime(row['time'], "%Y-%m-%dT%H:%M:%S.%fZ")
        yield row

if __name__ == "__main__":
    import logging
    from etlhelper import logger
    logger.setLevel(logging.INFO)

    db = DbParams(dbtype="SQLITE", filename="observations.sqlite")
    with db.connect() as conn:
        load_observations('/tmp/observations.csv', conn)
volcan01010 commented 2 years ago

I've added WIP until I have updated the README.

volcan01010 commented 2 years ago

README updated.

volcan01010 commented 2 years ago

@ximenesuk - I've tagged you as reviewer for this but I know that you are on leave. There is no rush to do this before the new year, so unless you get coding withdrawal symptoms then it can wait until you are back. I only did it now because the ideas were fresh in my head and I had some non-sprint days.

volcan01010 commented 2 years ago

I just added the demonstration script above to the README documentation as the data loading function. It will be useful to have it their as a demo that people can play with.

ximenesuk commented 2 years ago

Tests all passed. I tried the script as-is, with a different on_error and with the default argument. It all worked fine.