open-contracting / ocdskit

A suite of command-line tools for working with OCDS data
https://ocdskit.readthedocs.io
BSD 3-Clause "New" or "Revised" License
17 stars 6 forks source link

Test PostgreSQL backend (slower) #116

Closed jpmckinney closed 4 years ago

jpmckinney commented 4 years ago

Since PostgreSQL has native JSON types (json and jsonb), I wanted to see if it would spend less time serializing/deserializing JSON. However, it spends much more time in execute_values. Putting code here for posterity.

Adding the len(self.buffer) >= 10000 condition improved performance, but still not as fast as SQLite. Using prepared statements instead of bulk inserts didn't change performance much. (I don't know if there's a way to prepare bulk inserts…)

# packager.py
try:
    import psycopg2
    import psycopg2.extras
    using_postgresql = True
except ImportError:
    using_postgresql = False

class MyJson(psycopg2.extras.Json):
    def dumps(self, obj):
        return json_dumps(obj)

class PostgreSQLBackend(AbstractBackend):
    def __init__(self, connection_string):
        self.connection = psycopg2.connect(connection_string)

        # http://initd.org/psycopg/docs/usage.html#server-side-cursors
        self.cursor = self.connection.cursor()

        # https://www.postgresql.org/docs/9.4/sql-createtable.html#SQL-CREATETABLE-TEMPORARY
        self.cursor.execute("CREATE TEMP TABLE releases (ocid text, uri text, release json)")

        self.buffer = []

    def add_release(self, ocid, uri, release):
        self.buffer.append((ocid, uri, MyJson(release)))

    def flush(self, force=False):
        if force or len(self.buffer) >= 10000:
            # http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
            psycopg2.extras.execute_values(self.cursor, "INSERT INTO releases VALUES %s", self.buffer)

            self.buffer = []

    def get_releases_by_ocid(self):
        self.flush(force=True)

        self.cursor.execute("CREATE INDEX IF NOT EXISTS ocid_idx ON releases(ocid)")

        cursor = self.connection.cursor('ocdskit')
        cursor.execute("SELECT * FROM releases ORDER BY ocid")
        for ocid, rows in groupby(cursor, lambda row: row[0]):
            yield ocid, rows
        cursor.close()

    def close(self):
        self.cursor.close()
        self.connection.close()

# class Packager:
    def __init__(self, connection_string=None):
        self.connection_string = connection_string
# def __enter__(self):
        if using_postgresql and self.connection_string:
            self.backend = PostgreSQLBackend(self.connection_string)

# compile.py
# def add_arguments(self):
        self.add_argument('--connection-string', help='the libpq connection string, if using PostgreSQL to '
                                                      'temporarily store releases')
# def handle(self):
        kwargs['connection_string'] = self.args.connection_string

        if not (packager.using_postgresql and self.args.connection_string) and not packager.using_sqlite:

Prepared statements looked like:

# def __init__(self, connection_string):
        self.cursor.execute("PREPARE insertreleases(text, text, json) AS INSERT INTO releases VALUES ($1, $2, $3)")
# def flush(self, force=False):
            for args in self.buffer:
                self.cursor.execute("EXECUTE insertreleases(%s,%s,%s)", args)