adriangb / pgpq

Stream Arrow data into Postgres
MIT License
248 stars 17 forks source link

Allow partial encoders with `new_with_encoders` #31

Closed tomasfarias closed 9 months ago

tomasfarias commented 9 months ago

Amazing library, thank you for your work. I'm using it to stream Apache Arrow batch records into PG in binary format, which, I assume, is the main use case.

I have some string fields that I wish to encode as JSONB. I'm following samples from other issues like https://github.com/adriangb/pgpq/issues/21.

My problem is that my records have many fields, and if using new_with_encoders I have to set an encoder for every field, even if all fields except the string to JSONB fields could just use the default.

Example:

import pyarrow as pa
import pgpq
import pgpq.schema
import pgpq.encoders

# I'm reading records from a file-like source, but for testing purposes, imagine the following:
jsonb_arr = pa.array(['{}', '{"test": 1}'])
not_jsonb_arr = pa.array([1, 2])
rec = pa.RecordBatch.from_pydict({"jsonb_arr": jsonb_arr, "not_jsonb_arr": not_jsonb_arr})

enc = {
    "jsonb_arr": pgpq.encoders.StringEncoderBuilder.new_with_output(
        pa.field("jsonb_arr", pa.string()), pgpq.schema.Jsonb()
    )
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(rec.schema, enc)

Fails with:

called `Result::unwrap()` on an `Err` value: EncoderMissing { field: "not_jsonb_arr" }
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: EncoderMissing { field: "not_jsonb_arr" }

Even for well-known fields, having to set each encoder for fields that could just resolve to the default (as dictated in this match expression)

Expected:

An ArrowToPostgresBinaryEncoder is initialized with the encoder I set for jsonb_arr and default encoders for other fields.

Potential solution:

Extend new_with_encoders here to call build_encoders on each schema field that doesn't have an encoder set in the py dictionary.

Happy to contribute a patch and samples if that seems to be the way to go.

tomasfarias commented 9 months ago

Just as I was starting to write a dictionary to map fields to encoders, I noticed that the infer_encoders method exists. Completely missed it, it's my bad.

My problem can be solved with:

inferred = {field.name: pgpq.ArrowToPostgresBinaryEncoder.infer_encoder(field) for field in rec.schema}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(rec.schema, inferred | enc)

Happy to contribute samples if they can improve discovery of this method. Otherwise happy to close this issue.

adriangb commented 9 months ago

Samples/documentation would be great!

adriangb commented 9 months ago

I think we're all set here right @tomasfarias ? If so let's close the issue 🥳

tomasfarias commented 9 months ago

Happy to close! Looking forward to the development of pgpq, thank you @adriangb

adriangb commented 9 months ago

Feel free to leave any other feedback on API or otherwise.