adriangb / pgpq

Stream Arrow data into Postgres
MIT License
237 stars 16 forks source link

Unable to write to jsonb column using pgpq.encoders and ArrowToPostgresBinaryEncoder #21

Closed valterartur closed 11 months ago

valterartur commented 11 months ago

Firstly, I want to extend my gratitude to the maintainers and contributors of the pgpq library. It's an invaluable tool, and I truly appreciate the work put into it.

I have been working with PySpark's mapInArrow functionality, processing data and intending to write it to a PostgreSQL table with a column of type jsonb. While the processing using Arrow functions works flawlessly, I am encountering issues during the write operation to the jsonb column. To better understand this, I isolated the problem and tried some basic operations with pgpq and PyArrow.

However, even in the simplified scenario, I am facing challenges writing to a jsonb column, though the same process works fine for a jsonb[] column.

Reproducible code

import pyarrow as pa
import psycopg
import pgpq.schema
import pgpq.encoders
from pgpq import ArrowToPostgresBinaryEncoder
from pgpq.schema import PostgresSchema

# Sample data
batch = pa.RecordBatch.from_arrays(
    [
        pa.array(
            [["{}"], ['{"foo":"bar"}'], ["{}"]],
            type=pa.list_(pa.field("field", pa.string())),
        ),
    ],
    schema=pa.schema(
        [
            pa.field(
                "json_list",
                pa.list_(pa.field("field", pa.string())),
            ),
        ]
    ),
)

encoders = {
    "json_list": pgpq.encoders.ListEncoderBuilder.new_with_inner(
        batch.schema.field("json_list"),
        pgpq.encoders.StringEncoderBuilder.new_with_output(
            batch.schema.field("json_list").type.value_field, pgpq.schema.Jsonb()
        ),
    )
}

# Serialize data
encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(batch.schema, encoders)
buffer = bytearray()
buffer.extend(encoder.write_header())
buffer.extend(encoder.write_batch(batch))
buffer.extend(encoder.finish())

# Connect to PostgreSQL and write data
dsn = 'YOUR_DSN_HERE'  # Replace with your DSN
schema = 'schema'
t_name = 'data'
ddl = f"""
CREATE TABLE IF NOT EXISTS {schema}.{t_name}
(
    json_list jsonb
)
"""
with psycopg.connect(dsn) as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)
        conn.commit()
        # This COPY command does not work
        with cursor.copy(f"COPY {schema}.{t_name} FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(buffer)
        conn.commit()

The above example fails for a jsonb column but succeeds for a jsonb[] column.

schema = 'schema'
t_name = 'data1'
ddl = f"""
CREATE TABLE IF NOT EXISTS {schema}.{t_name}
(
    json_list jsonb[]
)
"""
with psycopg.connect(dsn) as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)
        conn.commit()
        with cursor.copy(f"COPY {schema}.{t_name} FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(buffer)
        conn.commit()

The exception is

/tmp/ipykernel_941951/1265249406.py in <module>
     56         # This COPY command does not work
     57         with cursor.copy(f"COPY {schema}.{t_name} FROM STDIN WITH (FORMAT BINARY)") as copy:
---> 58             copy.write(buffer)
     59         conn.commit()

/usr/lib64/python3.8/contextlib.py in __exit__(self, type, value, traceback)
    118         if type is None:
    119             try:
--> 120                 next(self.gen)
    121             except StopIteration:
    122                 return False

/proj/venv/lib64/python3.8/site-packages/psycopg/cursor.py in copy(self, statement, params, writer)
    914                 yield copy
    915         except e._NO_TRACEBACK as ex:
--> 916             raise ex.with_traceback(None)
    917 
    918         # If a fresher result has been set on the cursor by the Copy object,

InternalError_: unsupported jsonb version number 0
CONTEXT:  COPY data, line 1, column json_list

Is this a known limitation or oversight? Or could I be misusing the serialization process? Any guidance would be immensely helpful.

valterartur commented 11 months ago

Ok I understood now that this encoder is used for an jsonb array, would be appreciated for any code example I can use to store just jsonb

valterartur commented 11 months ago

I think I figured it out, at least this example finally worked for me. Going to give it a shot with df.mapInArrow

import pyarrow as pa
import psycopg
import pgpq.schema
import pgpq.encoders
from pgpq import ArrowToPostgresBinaryEncoder
from pgpq.schema import PostgresSchema

# Sample data
batch = pa.RecordBatch.from_arrays(
    [
        pa.array(
            ["{}", '{"foo":"bar"}', "{}"],
            type=pa.string(),
        ),
    ],
    schema=pa.schema(
        [
            pa.field(
                "json_list",
                pa.string(),
            ),
        ]
    ),
)
encoders = {
    "json_list": pgpq.encoders.StringEncoderBuilder.new_with_output(
        batch.schema.field("json_list"), pgpq.schema.Jsonb()
    ),
}
encoder = ArrowToPostgresBinaryEncoder.new_with_encoders(batch.schema, encoders)
buffer = bytearray()
buffer.extend(encoder.write_header())
buffer.extend(encoder.write_batch(batch))
buffer.extend(encoder.finish())
dsn = "DSN"
conn = psycopg.connect(dsn)
schema = 'schema'
t_name = 'data'
ddl = f"""
CREATE TABLE IF NOT EXISTS {schema}.{t_name}
(
    json_list jsonb
)
"""
with psycopg.connect(dsn) as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)
        conn.commit()
        with cursor.copy(f"COPY {schema}.{t_name} FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(buffer)
        conn.commit()

Thanks again for such awesome tool, if you have any comments on this code let me know please

adriangb commented 11 months ago

That looks good to me! Glad you were able to figure it out. Please feel free to suggest changes to docs, add an example, etc. Contributions or donations are welcome :)