MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.88k stars 399 forks source link

Redshift with sqlalchemy 2 #1038

Closed raphaelauv closed 1 year ago

raphaelauv commented 1 year ago

facing errors like

unknown type: pg_catalog.jsonb

unknown type: pg_catalog.json

unrecognized configuration parameter "standard_conforming_strings"


I found a way to make work redshift with sqlalchemy 2 and asyncpg

solution

import os

from sqlalchemy.dialects.postgresql.asyncpg import PGDialect_asyncpg

from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine

POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")

POSTGRES_DB = os.getenv("POSTGRES_DB", "dwh")
POSTGRES_SERVER = os.getenv("POSTGRES_SERVER")

SQLALCHEMY_DATABASE_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:5439/{POSTGRES_DB}"

def custom_initialize(self, connection):
    from sqlalchemy.dialects.postgresql.base import PGDialect
    super(PGDialect, self).initialize(connection)

    # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
    self.supports_smallserial = self.server_version_info >= (9, 2)

    is_potentially_redshift = self.server_version_info < (8, 2)
    if is_potentially_redshift:
        self._backslash_escapes = False
        self._supports_create_index_concurrently = False
    else:
        self._set_backslash_escapes(connection)

    self._supports_drop_index_concurrently = self.server_version_info >= (
        9,
        2,
    )
    self.supports_identity_columns = self.server_version_info >= (10,)

async def custom_setup_asyncpg_json_codec(self, conn):
    pass

async def custom_setup_asyncpg_jsonb_codec(self, conn):
    pass

engine = create_async_engine(SQLALCHEMY_DATABASE_URL)

dialect: PGDialect_asyncpg = engine.dialect
dialect.setup_asyncpg_json_codec = custom_setup_asyncpg_json_codec.__get__(dialect, PGDialect_asyncpg)
dialect.setup_asyncpg_jsonb_codec = custom_setup_asyncpg_jsonb_codec.__get__(dialect, PGDialect_asyncpg)
dialect.initialize = custom_initialize.__get__(dialect, PGDialect_asyncpg)

async_session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine)
db_client = async_session()

using

sqlalchemy[asyncio]==2.0.15
asyncpg==0.27.0