crate / crate-clients-tools

Clients, tools, and integrations for CrateDB.
https://crate.io/docs/clients/
Apache License 2.0
2 stars 1 forks source link

asyncpg issues a `WITH RECURSIVE` query when using `float[]` #52

Open matriv opened 1 year ago

matriv commented 1 year ago

I've tried to introduce a float[] test: https://gist.github.com/matriv/05bec849ff166789d0378625e43c06b0 in: https://github.com/crate/crate-qa/blob/master/tests/client_tests/python/asyncpg/test_asyncpg.py (method in question: float_vector_can_be_inserted_and_selected) and the driver issues the following query which currently CrateDB doesn't support (WITH RECURSIVE)

Maybe there is a way to avoid this query by tweaking asyncpg?

WITH RECURSIVE typeinfo_tree(
    oid, ns, name, kind, basetype, elemtype, elemdelim,
    range_subtype, attrtypoids, attrnames, depth)
AS (
    SELECT
        ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
        ti.elemtype, ti.elemdelim, ti.range_subtype,
        ti.attrtypoids, ti.attrnames, 0
    FROM
            (
        SELECT
            t.oid                           AS oid,
            ns.nspname                      AS ns,
            t.typname                       AS name,
            t.typtype                       AS kind,
            (CASE WHEN t.typtype = 'd' THEN
                (WITH RECURSIVE typebases(oid, depth) AS (
                    SELECT
                        t2.typbasetype      AS oid,
                        0                   AS depth
                    FROM
                        pg_type t2
                    WHERE
                        t2.oid = t.oid

                    UNION ALL

                    SELECT
                        t2.typbasetype      AS oid,
                        tb.depth + 1        AS depth
                    FROM
                        pg_type t2,
                        typebases tb
                    WHERE
                       tb.oid = t2.oid
                       AND t2.typbasetype != 0
               ) SELECT oid FROM typebases ORDER BY depth DESC LIMIT 1)

               ELSE NULL
            END)                            AS basetype,
            t.typelem                       AS elemtype,
            elem_t.typdelim                 AS elemdelim,
            COALESCE(
                range_t.rngsubtype,
                multirange_t.rngsubtype)    AS range_subtype,
            (CASE WHEN t.typtype = 'c' THEN
                (SELECT
                    array_agg(ia.atttypid ORDER BY ia.attnum)
                FROM
                    pg_attribute ia
                    INNER JOIN pg_class c
                        ON (ia.attrelid = c.oid)
                WHERE
                    ia.attnum > 0 AND NOT ia.attisdropped
                    AND c.reltype = t.oid)

                ELSE NULL
            END)                            AS attrtypoids,
            (CASE WHEN t.typtype = 'c' THEN
                (SELECT
                    array_agg(ia.attname::text ORDER BY ia.attnum)
                FROM
                    pg_attribute ia
                    INNER JOIN pg_class c
                        ON (ia.attrelid = c.oid)
                WHERE
                    ia.attnum > 0 AND NOT ia.attisdropped
                    AND c.reltype = t.oid)

                ELSE NULL
            END)                            AS attrnames
        FROM
            pg_catalog.pg_type AS t
            INNER JOIN pg_catalog.pg_namespace ns ON (
                ns.oid = t.typnamespace)
            LEFT JOIN pg_type elem_t ON (
                t.typlen = -1 AND
                t.typelem != 0 AND
                t.typelem = elem_t.oid
            )
            LEFT JOIN pg_range range_t ON (
                t.oid = range_t.rngtypid
            )
            LEFT JOIN pg_range multirange_t ON (
                t.oid = multirange_t.rngmultitypid
            )
    )
 AS ti
    WHERE
        ti.oid = any($1::oid[])

    UNION ALL

    SELECT
        ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
        ti.elemtype, ti.elemdelim, ti.range_subtype,
        ti.attrtypoids, ti.attrnames, tt.depth + 1
    FROM
            (
        SELECT
            t.oid                           AS oid,
            ns.nspname                      AS ns,
            t.typname                       AS name,
            t.typtype                       AS kind,
            (CASE WHEN t.typtype = 'd' THEN
                (WITH RECURSIVE typebases(oid, depth) AS (
                    SELECT
                        t2.typbasetype      AS oid,
                        0                   AS depth
                    FROM
                        pg_type t2
                    WHERE
                        t2.oid = t.oid

                    UNION ALL

                    SELECT
                        t2.typbasetype      AS oid,
                        tb.depth + 1        AS depth
                    FROM
                        pg_type t2,
                        typebases tb
                    WHERE
                       tb.oid = t2.oid
                       AND t2.typbasetype != 0
               ) SELECT oid FROM typebases ORDER BY depth DESC LIMIT 1)

               ELSE NULL
            END)                            AS basetype,
            t.typelem                       AS elemtype,
            elem_t.typdelim                 AS elemdelim,
            COALESCE(
                range_t.rngsubtype,
                multirange_t.rngsubtype)    AS range_subtype,
            (CASE WHEN t.typtype = 'c' THEN
                (SELECT
                    array_agg(ia.atttypid ORDER BY ia.attnum)
                FROM
                    pg_attribute ia
                    INNER JOIN pg_class c
                        ON (ia.attrelid = c.oid)
                WHERE
                    ia.attnum > 0 AND NOT ia.attisdropped
                    AND c.reltype = t.oid)

                ELSE NULL
            END)                            AS attrtypoids,
            (CASE WHEN t.typtype = 'c' THEN
                (SELECT
                    array_agg(ia.attname::text ORDER BY ia.attnum)
                FROM
                    pg_attribute ia
                    INNER JOIN pg_class c
                        ON (ia.attrelid = c.oid)
                WHERE
                    ia.attnum > 0 AND NOT ia.attisdropped
                    AND c.reltype = t.oid)

                ELSE NULL
            END)                            AS attrnames
        FROM
            pg_catalog.pg_type AS t
            INNER JOIN pg_catalog.pg_namespace ns ON (
                ns.oid = t.typnamespace)
            LEFT JOIN pg_type elem_t ON (
                t.typlen = -1 AND
                t.typelem != 0 AND
                t.typelem = elem_t.oid
            )
            LEFT JOIN pg_range range_t ON (
                t.oid = range_t.rngtypid
            )
            LEFT JOIN pg_range multirange_t ON (
                t.oid = multirange_t.rngmultitypid
            )
    )
 ti,
        typeinfo_tree tt
    WHERE
        (tt.elemtype IS NOT NULL AND ti.oid = tt.elemtype)
        OR (tt.attrtypoids IS NOT NULL AND ti.oid = any(tt.attrtypoids))
        OR (tt.range_subtype IS NOT NULL AND ti.oid = tt.range_subtype)
        OR (tt.basetype IS NOT NULL AND ti.oid = tt.basetype)
)

SELECT DISTINCT
    *,
    basetype::regtype::text AS basetype_name,
    elemtype::regtype::text AS elemtype_name,
    range_subtype::regtype::text AS range_subtype_name
FROM
    typeinfo_tree
ORDER BY
    depth DESC
amotl commented 1 year ago

Hi Marios,

thank you for reporting this. The thing is, that asyncpg trips into this code at ^1, where types_with_missing_codecs is {1021}.

Does this sound familiar to you?

With kind regards, Andreas.

amotl commented 1 year ago

I've discovered a few more references here.

This might be a possible solution/workaround?

amotl commented 1 year ago

When preventing the client to go into the loop processing the elements of types_with_missing_codecs, effectively letting it skip this section, asyncpg will fail next with:

asyncpg.exceptions._base.InternalClientError: no decoder for OID 1021

Hope this helps.

matriv commented 1 year ago

Thanks for investigating! The thing is that OID 1021 is not a custom type, it's the standard float[] for Postgres, so it seems that asyncpg is not properly registering them in the first place.

amotl commented 1 year ago

Yes, it looks like OID 1021 hasn't been added to the driver yet. In this spirit, I suspect asyncpg will fail in the same way on pgvector as well?

-- https://github.com/MagicStack/asyncpg/blob/v0.28.0/asyncpg/protocol/pgtypes.pxi#L52-L56

amotl commented 1 year ago

A corresponding codec would need to be registered at ^1, and implemented at pgproto ^2.

amotl commented 1 year ago

Trying to exercise what is recommended at https://github.com/MagicStack/asyncpg/issues/1078#issuecomment-1725902124, both set_builtin_type_codec() and set_type_codec() are invoking _introspect_type(typename, schema), which will again attempt to introspect the PostgreSQL server.

So, effectively, when running such code upfront:

await conn.set_type_codec(
    "float[]",
    encoder=lambda s: s,
    decoder=lambda s: s,
    format="binary")

it will emit an SQL statement like:

SELECT
    t.oid,
    t.typelem     AS elemtype,
    t.typtype     AS kind
FROM
    pg_catalog.pg_type AS t
    INNER JOIN pg_catalog.pg_namespace ns ON (ns.oid = t.typnamespace)
WHERE
    t.typname = $1 AND ns.nspname = $2

with arguments:

['float[]', 'public']

It will fail with

ValueError: unknown type: public.float[]
amotl commented 1 year ago

All right, after pulling the most recent crate/crate:nightly OCI image, and inspecting the type table using

select typname from pg_catalog.pg_type;

I can observe a new type called oidvector. Is it legit?

Now, after adjusting the initialization code accordingly, like:

await conn.set_type_codec(
    "oidvector",
    encoder=lambda s: s,
    decoder=lambda s: s,
    schema="pg_catalog",
    format="binary")

asyncpg will use those arguments on the SQL statement outlined in my previous post:

['oidvector', 'pg_catalog']

and will fail with a different exception:

  File "/Users/amo/dev/crate/drivers/asyncpg/test_cratedb.py", line 120, in exec_queries_pooled
    await conn.set_type_codec(
  File "/Users/amo/dev/crate/drivers/asyncpg/asyncpg/connection.py", line 1252, in set_type_codec
    raise exceptions.InterfaceError(
asyncpg.exceptions._base.InterfaceError: cannot use custom codec on type pg_catalog.oidvector: it is neither a scalar type nor a composite type
amotl commented 1 year ago

When using the other type of type registry helper function, set_builtin_type_codec():

await conn.set_builtin_type_codec(
    "oidvector",
    schema="pg_catalog",
    codec_name="binary",
)

the exception is like:

  File "/Users/amo/dev/crate/drivers/asyncpg/test_cratedb.py", line 129, in exec_queries_pooled
    await conn.set_builtin_type_codec(
  File "/Users/amo/dev/crate/drivers/asyncpg/asyncpg/connection.py", line 1332, in set_builtin_type_codec
    raise exceptions.InterfaceError(
asyncpg.exceptions._base.InterfaceError: cannot alias non-scalar type pg_catalog.oidvector
matriv commented 1 year ago

oidvector is old: https://github.com/crate/crate/pull/10137 Actually I was trying to test our new float_vector which reflects to float[] (oid: 1021) as far as postgres protocol is concerned.

amotl commented 1 year ago

Now that the introspection query works, by using a snippet like

await conn.set_type_codec(
    "oidvector",
    encoder=lambda s: s,
    decoder=lambda s: s,
    schema="pg_catalog",
)

yielding an SQL of:

SELECT
    t.oid,
    t.typelem     AS elemtype,
    t.typtype     AS kind
FROM
    pg_catalog.pg_type AS t
    INNER JOIN pg_catalog.pg_namespace ns ON (ns.oid = t.typnamespace)
WHERE
    t.typname = 'oidvector' AND ns.nspname = 'pg_catalog'
;

and a result of:

+-----+----------+------+
| oid | elemtype | kind |
+-----+----------+------+
|  30 |       26 | b    |
+-----+----------+------+

we can inspect what goes wrong. Internally, asyngpc wraps that into:

typeinfo = <Record oid=30 elemtype=26 kind='b'>

Please note that elemtype is defined and not zero, and that the value type of kind is string, and not bytes.

amotl commented 1 year ago

Now, I would like to guide you to the implementation where the exception from asyncpg is originating from, see ^1. Oh, that has been improved already (I am on master), see ^2.

Nevertheless, this is why it can't be classified as "scalar type":

If it should be classified as "composite type", it would need to satisfy typeinfo['kind'] == b'c', at least when taking the most recent code of asyncpg into consideration, which is yet unreleased (not yet in 0.28.0).

amotl commented 1 year ago

oidvector is old: https://github.com/crate/crate/pull/10137 Actually I was trying to test our new float_vector which reflects to float[] (oid: 1021) as far as postgres protocol is concerned.

Thank you. Unfortunately, I can't spot such a type when running this SQL statement.

select typname, typelem AS elemtype, typtype AS kind from pg_catalog.pg_type;

Does it miss to list a corresponding item like?

amotl commented 1 year ago

I was able to register something with ConnectionSettings.register_data_types(), another type of registry method, effectively mimicking missing response records from the pg_type table.

vector_type = dict(
    oid=1021,
    ns="pg_catalog",
    name="float[]",
    kind=b"b",
    basetype=0,
    elemtype=26,
    elemdelim=",",
    range_subtype=None,
    attrtypoids=None,
)
settings = conn._protocol.get_settings()
settings.register_data_types([vector_type])

Most probably, I made many mistakes, that's why the results of the float_vector_can_be_inserted_and_selected test case are still incorrect. At least, no exceptions raised. Does it help?

AssertionError: Lists differ: [1066192077, 1074580685, 1079194419, 1082969293] != [1.1, 2.2, 3.3, 4.4]

First differing element 0:
1066192077
1.1

- [1066192077, 1074580685, 1079194419, 1082969293]
+ [1.1, 2.2, 3.3, 4.4]
amotl commented 1 year ago

Using FLOAT4OID = 700 as elemtype is of course closer to what we would expect here.

Type Definition

vector_type = dict(
    oid=1021,
    ns="pg_catalog",
    name="float[]",
    kind=b"b",
    basetype=0,
    elemtype=700,
    elemdelim=",",
    range_subtype=None,
    attrtypoids=None,
)

Outcome

- [1.100000023841858, 2.200000047683716, 3.299999952316284, 4.400000095367432]
+ [1.1, 2.2, 3.3, 4.4]
matriv commented 1 year ago

Thx a lot for your work here! at least we have some kind of workaround if users ask for it.

matriv commented 1 year ago

I'd say to leave this issue open for the time being and I'll raise a discussion for maybe implementing WITH RECURSIVE soon.

amotl commented 1 year ago

Thanks. What about adding the missing type definition to the corresponding table in CrateDB's pg_catalog?

On 25 September 2023 17:25:54 CEST, Marios Trivyzas @.***> wrote:

I'd say to leave this issue open for the time being and I'll raise a discussion for maybe implementing WITH RECURSIVE soon.

-- Reply to this email directly or view it on GitHub: https://github.com/crate/crate-clients-tools/issues/52#issuecomment-1733949828 You are receiving this because you commented.

Message ID: @.***> -- Sent from my mind. This might have been typed on a mobile device, so please excuse my brevity.

matriv commented 1 year ago

float[] is already registered in pg_type for CrateDB (it's _float4, _ prefix denotes array of)

amotl commented 1 year ago

Aha, all right. Does it make sense then to ask at asyncpg about adding support for OID 1021?

On 25 September 2023 19:12:04 CEST, Marios Trivyzas @.***> wrote:

float[] is already registered in pg_type for CrateDB (it's _float4, _ prefix denotes array of)

-- Reply to this email directly or view it on GitHub: https://github.com/crate/crate-clients-tools/issues/52#issuecomment-1734154887 You are receiving this because you commented.

Message ID: @.***> -- Sent from my mind. This might have been typed on a mobile device, so please excuse my brevity.

matriv commented 1 year ago

I think yes, because all arrays have the same issue and the with recursive query is issued, e.g.: boolean array:

fv = [True, False, True, False]
await conn.execute('drop table if exists tbl_fv')
await conn.execute('create table tbl_fv (id int, fv boolean[])')
await conn.execute('insert into tbl_fv (id, fv) values (1, ?)', fv)
await conn.execute('refresh table tbl_fv')
result = await conn.fetch('select id, fv from tbl_fv order by id')
test.assertEqual(result[0][1], fv)
seut commented 10 months ago

Lets document the required steps for a user to use the float[] with asyncpg. Thus we probably need to create a section related to asyncpg with the documentation what limitations exists or what workarounds are needed.

JamesHutchison commented 5 months ago

Just to chime in, I'm seeing 14 second queries with CockroachDB doing this inspection for _varchar

JamesHutchison commented 5 months ago

sorry, just realized this isn't the right repo for this.