adriangb / pgpq

Stream Arrow data into Postgres
MIT License
248 stars 17 forks source link

How should I handle arrays of structs, #17

Open dandexcare opened 1 year ago

dandexcare commented 1 year ago

How do I handle a field with array of struct values like the following? I will send a full example shortly :

df.with_columns(pl.struct('aggregate_ratings.sub_ratings').alias('aggregate_ratings.sub_ratings').map(to_json, return_dtype=pl.Utf8))

id = pa.array([1,2,3])
complicated = pa.array([[{'average_rating': 4.9, 'crawled_date': '2023-06-06'},{'average_rating': 4.7, 'crawled_date': '2023-06-04'}]
                        ,[{'average_rating': 4.8, 'crawled_date': '2023-05-06'},{'average_rating': 4.6, 'crawled_date': '2023-05-04'}]
                        ,[{'average_rating': 4.7, 'crawled_date': '2023-04-06'},{'average_rating': 4.5, 'crawled_date': '2023-04-04'}]])
names = ["id", "complicated"]
complicated = array_to_json(complicated) 
df = pa.RecordBatch.from_arrays(
        [
            pa.array([0, 1, 2]),
            pa.array(array_to_json(complicated),              
            type=pa.list_(pa.struct(pa.field("average_rating", pa.double()),pa.field("crawled_date", pa.large_string()))),
            ),
        ],
        schema=pa.schema(
            [ ("id", pa.int32()),
                pa.field(
                    "complicated",
                    pa.list_(pa.struct(pa.field("average_rating", pa.double()),pa.field("crawled_date", pa.large_string()))),
                ),
            ]
        ),names=names).to_pandas()

print(df)

I made an attempt with the following encoder but it fails on the copy because the output tye is Jsonb() instead of jsonb[]:

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    schema,
    {   'main_key': Int32EncoderBuilder(schema.field('main_key')),
        'aggregate_ratings.sub_ratings': LargeStringEncoderBuilder.new_with_output(
        schema[schema.get_field_index('aggregate_ratings.sub_ratings')],
        Jsonb()
    )}
)

Error:

psycopg.errors.QueryCanceled: COPY from stdin failed: error from Python: PanicException - called `Result::unwrap()` on an `Err` value: ColumnTypeMismatch { field: "aggregate_ratings.sub_ratings", expected: "arrow_array::array::byte_array::GenericByteArray<arrow_array::types::GenericStringType<i64>>", actual: LargeList(Field { name: "item", data_type: Struct([Field { name: "average_rating", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "crawled_date", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "metric", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }
adriangb commented 1 year ago

See #5

adriangb commented 1 year ago

Sorry I think I got confused by the mention of struct columns. Seems like the issue is something else, let me take a look.

Regardless, the error message should be better

dandexcare commented 1 year ago

My best guess is that it's the transformation of the arrays of struct values to json.

Get Outlook for iOShttps://aka.ms/o0ukef


From: Adrian Garcia Badaracco @.> Sent: Thursday, June 15, 2023 8:46:35 PM To: adriangb/pgpq @.> Cc: Daniel Rappaport @.>; Author @.> Subject: Re: [adriangb/pgpq] How should I handle arrays of structs, (Issue #17)

Sorry I think I got confused by the mention of struct columns. Seems like the issue is something else, let me take a look.

Regardless, the error message should be better

— Reply to this email directly, view it on GitHubhttps://github.com/adriangb/pgpq/issues/17#issuecomment-1594043988, or unsubscribehttps://github.com/notifications/unsubscribe-auth/A7NUCNWGUG47ESW6FKQBTQLXLPJJXANCNFSM6AAAAAAZISQWGA. You are receiving this because you authored the thread.Message ID: @.***>

adriangb commented 1 year ago

I think the issue is how you were creating the encoders. In particular I don't see LargeStringEncoderBuilder anywhere. Try to adapt this example + the one from #13:

import psycopg
import pyarrow as pa
from pgpq import ArrowToPostgresBinaryEncoder
from pgpq.encoders import ListEncoderBuilder, LargeStringEncoderBuilder
from pgpq.schema import Jsonb

fields = [
    ('a', pa.int32()),
    ('b', pa.bool_()),
]
schema = pa.schema([
    ('col', pa.list_(pa.large_string())),
])

batch = pa.RecordBatch.from_arrays(
    [
        pa.array([['{"a":123,"b":false}', '{"a":456,"b":true}']])
    ],
    schema=schema,
)

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    batch.schema,
    {
        'col': ListEncoderBuilder.new_with_inner(
            schema.field(0),
            LargeStringEncoderBuilder.new_with_output(
                schema.field(0).type.field(0),
                Jsonb(),
            )
        )
    }
)

ddl = f"DROP TABLE IF EXISTS data;CREATE TABLE data (data jsonb[]);"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
    conn.commit()

I do feel like there should be a simpler way to build these encoders, but I'm not sure what the right API is just yet.

dandexcare commented 1 year ago

I noticed that you have single quotes surrounding the struct in the array of structs. So I will do that transformation:

pa.array([['{"a":123,"b":false}', '{"a":456,"b":true}']])

And I am receiving the "jsonb not-supported error", and I am not clear the work-around while using the release 0.7.3. I tried making the receiving table JSON, but I don't know how to set the output type of the encoder to Json().

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    batch.schema,
    {
        'col': ListEncoderBuilder.new_with_inner(
            schema.field(0),
            LargeStringEncoderBuilder.new_with_output(
                schema.field(0).type.field(0),
                Jsonb(),
            )
        )
    }
)
adriangb commented 1 year ago

The single quotes are just because json uses double quotes so it avoids escaping. You should get the same result if you convert a struct to json, I just skipped that step to simplify the example.

And I am receiving the "jsonb not-supported error", and I am not clear the work-around while using the release 0.7.3. I tried making the receiving table JSON, but I don't know how to set the output type of the encoder to Json().

I'm not sure I'm understanding what issue you are having or where you're getting that error. The jist of it is that you need to make the encoder a list of Jsonb, what you have there looks about right, and make the column in the table jsonb[] (see my DDL above).

dandexcare commented 1 year ago

image

I'm not sure I'm understanding what issue you are having or where you're getting that error. The jist of it is that you need to make the encoder a list of Jsonb, what you have there looks about right, and make the column in the table jsonb[] (see my DDL above).

I did not change your code that you sent 12 hours ago... Any ideas?

Warm regards, Dan

adriangb commented 1 year ago

Are you sure you're using 0.7.3? Does my example above as is run for you?

dandexcare commented 1 year ago

yes, but just to make sure, I am going to create a venv

dandexcare commented 1 year ago

numpy==1.24.3 pgpq==0.7.3 polars==0.18.3 psycopg==3.1.9 pyarrow==12.0.1 typing_extensions==4.6.3

It didn't error! But the result looks like a struct within a struct, not and array of structs: image

adriangb commented 1 year ago

try doing something like SELECT col[1] FROM data

dandexcare commented 1 year ago

It worked, so is that functionally the same as [{'a': 123, 'b': False}, {'a': 456, 'b': True}] ?

adriangb commented 1 year ago

Yes at that point this is in Postgres, what I gave you is just the Postgres syntax for working with arrays: https://www.postgresql.org/docs/current/arrays.html

dandexcare commented 1 year ago

It worked, so finish off this evaluation, I need to create an encoder for all the arrays of structs:

encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(
    schema,
    {
        field: ListEncoderBuilder.new_with_inner(
            schema[schema.get_field_index(field)],
            LargeStringEncoderBuilder.new_with_output(
                schema[schema.get_field_index(field)].type[schema.get_field_index(field)],
                Jsonb(),
            )
        )
       for field in struct_array_fields
    }
)

and then possibly apply the uint64 -> uint 32 transformation that I saw in the outstanding issues. Hopefully it will just work!! Much appreciated!

adriangb commented 1 year ago

I think this is resolved, can we close this issue?

dandexcare commented 1 year ago

I am blocked by this. I reduced the number of fields to just one to try to isolate the problem.:

ColumnTypeMismatch { field: "aggregate_ratings.sub_ratings", expected: "arrow_array::array::list_array::GenericListArray<i32>", actual: LargeList(Field { name: "item", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }

When converting to json, I need to set orient='records' so the metadata falls away. I don't know how to stick that into this function:

def to_json(s: pl.Series) -> pl.Series:
    print('*********************s')
    print(s)
    a = s.to_arrow()
    a = array_to_utf8_json_array(a)
    print(a)
    return pl.from_arrow(a)

THis is how is was able to converrt the items in the arrays of structs to json strings, but I think it is capturing the metadata, but I am having difficulty troubleshooting:


# Add a row number column using row_number() function
df = df.with_row_count(name="row_num", offset=1)

exploded_df = df.explode("aggregate_ratings.sub_ratings")
exploded_df = exploded_df.with_columns(pl.col("aggregate_ratings.sub_ratings")).map(to_json, return_dtype=pl.Utf8)

aggregated_df = exploded_df.groupby("row_num").agg(
        [
            pl.col('aggregate_ratings.sub_ratings'),
        ]
    )
df = aggregated_df.select(['aggregate_ratings.sub_ratings'])