Open rwolniak opened 4 years ago
Hi, thanks for sharing this, it looks great! As you know, etlhelper
already uses execute_batch
with an iterator and a default page size (chunksize) of 5000, so I'm pleased that it performs relatively well. It has always performed quickly enough for my needs so I haven't had to look any further.
execute_values
looks nice. It has the same (cursor, query, iterator, page_size) syntax as execute_batch
, so we could consider modifying the PgDbHelper to use that function instead if the query matched a regex with "INSERT INTO .* VALUES". That would be an easy way to double the performance. We haven't got a good performance test suite to measure it with, yet.
As the article says, the copy_from
approach has extra issues with datatype conversions and escaping. I'd wondered about float format precision and spatial data, too. So I wouldn't want to put it into executemany
. But you could definitely still use the copy_from
approach with etlhelper
. If it works, it would be a good "recipe" for the documentation.
The method on the webpage uses the StringIteratorIO
class with an iterator of CSV lines. You can create one of those with iter_rows
and a custom row_factory
.
from etlhelper import iter_rows, DbParams
SRC_DB = DbParams(dbtype="ORACLE", ...)
DEST_DB = DbParams(dbtype="PG", ...)
select_sql = "SELECT id, value FROM my_table"
def csv_string_row_factory(cursor):
"""Replace default tuple output with text string e.g. csv."""
def create_row(row)
return "{}|{}".format(*row)
return create_row
# Do the transformation
with SRC_DB.connect('PASSWORD_VARIABLE') as src_conn:
csv_string_generator = iter_rows(select_sql, src_conn, row_factory=csv_string_row_factory)
dummy_csv_file = StringIteratorIO(csv_string_generator)
with DEST_DB.connect('ANOTHER_PASSWORD_VARIABLE') as dest_conn:
with dest_conn.cursor() as cursor:
cursor.copy_from(dummy_csv_file, 'target_table', sep='|', size=5000)
Have a go and let me know how you get on.
It may be possible to write a more generic csv_string_row_factory
using StringIO and the Python csv module: https://stackoverflow.com/questions/9157314/how-do-i-write-data-into-csv-format-as-string-not-file
This would have the benefit of taking care of arbitrary row lengths and quoting of text strings etc.. We could include that in the main etlhelper
as it could be more widely useful. We could also include the StringIteratorIO code to make it really easy. What's the licence of code posted on Stack Overflow?
StackOverflow code is CC-BY-SA-4.0. But a StringIteratorIO could be in a module of its own and the different licence acknowleged there. https://stackoverflow.com/legal/terms-of-service#licensing
Oh nice! Thanks for the detailed explanation :) Good to note about the license as well. I'll give it a try today and keep you posted on how it goes!
Managed to get it working using this:
def csv_string_row_factory(cursor):
"""Replace default tuple output with text string e.g. csv."""
def create_row(row):
si = io.StringIO()
cw = csv.writer(si, delimiter='|')
cw.writerow("None" if x is None else x for x in row)
return si.getvalue()
return create_row
Still testing speed with some etl trials.
As an update, I've been seeing best results (i.e. fewest errors) using:
DELIMITER = '\t'
def csv_string_row_factory(cursor):
"""Replace default tuple output with text string e.g. csv."""
def create_row(row):
si = io.StringIO()
cw = csv.writer(si, delimiter=DELIMITER, quoting=csv.QUOTE_NONE, escapechar='\\')
cw.writerow(row)
return si.getvalue()
return create_row
Not sure if it's something that would need to be integrated into etlhelper at this point, but figured I'd post it here in case it helps anyone else. I'm also using primary key sorting on the Oracle side to allow for pagination of the source table, so that a new connection is created for every x-million rows (thereby avoiding timeout issues).
Great! Thanks for the update. Keep us updated if you find any other tweaks. I think it is worth adding as a recipe in the README at some point at least. There is a ticket for executemany
optimisations for the MSSQL connections so we may write a benchmarking test. It would be interesting to apply the same test on different PostgreSQL options.
I also wonder if a csv_row_factory
would always be specific to the data or if there is a sufficiently generic version that would be worth including for wider use.
Hi @rwolniak,
This issue has been around a while, but I've kept it open because I'd still like to create a "Recipe" for this once we move to a ReadTheDocs style documentation page.
In the meantime, I was working on a Oracle to PostgreSQL ETL and I used pyinstrument to try to see where I could speed it up. It turned out that the read_lob
part, which was extracting a text representation from an Oracle Spatial geometry object, was a massive bottleneck.
I found there is another way to read CLOB objects directly as strings. When I applied it, I had a 10x speed up.
I plan to make this the default behaviour in a future release: https://github.com/BritishGeologicalSurvey/etlhelper/issues/110#issuecomment-988663929
If your pipeline involves CLOBs or BLOBs it might benefit from this approach.
Cheers John
I've been doing some research into speeding up my ETLs for Oracle -> Postgres using etlhelper, and came across this article for loading data into postgresql. It looks like using psycopg2's
copy_from
in combination withio.StringIO
could result in up to 5x performance improvements on the Postgres side. Is there a way to leverage this for etlhelper? Maybe an optional flag for the postgres db_helper's executemany function to use it? Would be amazing to be able to cut my ETL time down for multi-million-row tables.