supabase / pg_replicate

Build Postgres replication apps in Rust
Apache License 2.0
596 stars 22 forks source link

Add support for decoding built-in types from the replication stream #1

Open imor opened 6 months ago

imor commented 6 months ago

Support built-in types from the following table: https://www.postgresql.org/docs/current/datatype.html#DATATYPE-TABLE

Done Name Aliases Description
bigint int8 signed eight-byte integer
bigserial serial8 autoincrementing eight-byte integer
⬜️ bit [ (n) ]   fixed-length bit string
⬜️ bit varying [ (n) ] varbit [ (n) ] variable-length bit string
boolean bool logical Boolean (true/false)
⬜️ box   rectangular box on a plane
bytea   binary data (“byte array”)
character [ (n) ] char [ (n) ] fixed-length character string
character varying [ (n) ] varchar [ (n) ] variable-length character string
⬜️ cidr   IPv4 or IPv6 network address
⬜️ circle   circle on a plane
date   calendar date (year, month, day)
double precision float8 double precision floating-point number (8 bytes)
⬜️ inet   IPv4 or IPv6 host address
integer int, int4 signed four-byte integer
⬜️ interval [ fields ] [ (p) ]   time span
json   textual JSON data
jsonb   binary JSON data, decomposed
⬜️ line   infinite line on a plane
⬜️ lseg   line segment on a plane
⬜️ macaddr   MAC (Media Access Control) address
⬜️ macaddr8   MAC (Media Access Control) address (EUI-64 format)
⬜️ money   currency amount
numeric [ (p, s) ] decimal [ (p, s) ] exact numeric of selectable precision
⬜️ path   geometric path on a plane
⬜️ pg_lsn   PostgreSQL Log Sequence Number
⬜️ pg_snapshot   user-level transaction ID snapshot
⬜️ point   geometric point on a plane
⬜️ polygon   closed geometric path on a plane
real float4 single precision floating-point number (4 bytes)
smallint int2 signed two-byte integer
smallserial serial2 autoincrementing two-byte integer
serial serial4 autoincrementing four-byte integer
text   variable-length character string
time [ (p) ] [ without time zone ]   time of day (no time zone)
⬜️ time [ (p) ] with time zone timetz time of day, including time zone
timestamp [ (p) ] [ without time zone ]   date and time (no time zone)
timestamp [ (p) ] with time zone timestamptz date and time, including time zone
⬜️ tsquery   text search query
⬜️ tsvector   text search document
⬜️ txid_snapshot   user-level transaction ID snapshot (deprecated; see pg_snapshot)
uuid   universally unique identifier
⬜️ xml   XML data
cvauclair commented 2 months ago

Hey @imor, Im working on a project for which I want to use pg_replicate with a custom sink. However, my source DB contains a lot of types that are currently unsupported (most notably the numeric type).

I took a look around the codebase as well as the diesel codebase (the latter does support all built-in types) to see how we could extend the supported types.

Diesel essentially wraps the raw pgoutput bytes in a PgValue type, and implements the FromSql trait to convert the PgValue raw bytes to Rust types (BigDecimal in the case of the numeric postgres type).

Since those trait implementations parse the raw bytes, it seems like most of the FromSql implementations could be reused (if not outright copied) in your CdcEventConverter::from_tuple_data to support additional types but I wanted to check in with you to see what you thought of the idea.

Love the project, keep up the good work!

imor commented 1 month ago

Your ideas are directionally correct. Just want to point out that apart from CdcEventConverter::from_tuple_data there's also TableRowConverter::get_cell_value which is called when initial table is copied before CdcEvent start arriving.

Apart from the conversions themselves you might face another hurdle if you are implementing a custom sink. Methods in the BatchSink (and the older Sink) traits hard code the error type as SinkError which might not work for a custom sink impl outside the pg_replicate crate. There's an issue for this but haven't found the time to tackle it yet.

cvauclair commented 1 month ago

Yes I noticed pretty early there were two converters to handle :sweat_smile:

Quick question about this, is there a reason why the input of both converters is different? As in, why are the bytes in CdcEventConverter::from_tuple_data are parseable with a simple from_utf8 (even the for the numeric type) while the bytes in TableRowConverter::get_cell_value require more elaborate decoding? (sorry if this is obvious, pretty new to this low-level postgres stuff).

On the SinkError issue, I noticed that there is already a fork out there which does implement the error as an associated type (I inspired myself from that fork for my own). Is that something you would consider merging if I opened a PR for it?

imor commented 1 month ago

is there a reason why the input of both converters is different?

The reason for two separate code paths for decoding tuple values is because they are coming in from two different operations. When we want to replicate a table we want two things: the data in the table as it exists right now and any changes to the table data as they happen in realtime. The TableRowConverter::get_cell_value code path is for the initial table copy (data as it exists in the table right now) and CdcEventConverter::from_tuple_data is for the realtime changes to the table data. If you dig into the types of the input for these two operations (BinaryCopyOutRow and TupleData) you'd notice that the first one is a binary format and the second one is a text format (although clients can request a binary format but it's less portable as mentioned in the Postgres docs). That's another reason for the difference.

Is that something you would consider merging if I opened a PR for it?

PRs are always welcome to make the code better :)