cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
30.11k stars 3.81k forks source link

changefeedccl: support emitting binary-encoded row data #77849

Open stevekuznetsov opened 2 years ago

stevekuznetsov commented 2 years ago

When consuming a stream of changefeed events today, users must decode data from the wire to row values manually. Internally, CRDB uses the Postgres JSON encoder for sending data out, so users must know how to mirror this logic for decoding.

This causes issues when naively consuming JSON using a built-in decoder, such as Go's. For instance, Postgres will encode BLOB as a hex-string, whereas Go expects []byte to be encoded as a base-64-string.

Instead of exposing this detail to the user, CRDB should allow users to request that the data be sent as a composite type, which would allow language-specific drivers to handle the decoding into client-native types ex.

Ideally, this would land as a WITH ... option but after a deprecation period it may be useful to default to this encoding style.

/cc @ajwerner

Jira issue: CRDB-13795

Epic CRDB-19382

blathers-crl[bot] commented 2 years ago

Hello, I am Blathers. I am here to help you get the issue triaged.

It looks like you have not filled out the issue in the format of any of our templates. To best assist you, we advise you to use one of these templates.

I have CC'd a few people who may be able to assist you:

If we have not gotten back to your issue within a few business days, you can try the following:

:owl: Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

blathers-crl[bot] commented 2 years ago

cc @cockroachdb/cdc

ajwerner commented 2 years ago

The biggest challenge here is going to be reworking the abstractions. The changefeed has a notion of an Encoder and a Sink. The benefit of this approach is that it keeps the logic to handle the sql data out of the sink abstraction. The problem is that for the core changefeed, we implement the sink using this logic:

https://github.com/cockroachdb/cockroach/blob/3b3d080521d07edea92861b2ce59b20f233c57d3/pkg/ccl/changefeedccl/sink.go#L308-L337

Instead of being passed []byte we want to be passed datums for the key and the value.

There will definitely be some trickiness constructing the appropriate types to be plumbed here: https://github.com/cockroachdb/cockroach/blob/0a3fded95d1db7e7e0eba294eed8b76a78471824/pkg/ccl/changefeedccl/changefeed_stmt.go#L85

All-in-all, it's not a giant project, but it touches a lot of moving parts.

stevekuznetsov commented 2 years ago

@ajwerner do we have some understanding of what the schema should look like for this? We expect one pgwire-encoded row per event, right? Some thoughts:

Or are we expecting rows emitted with BLOB at the top-level for e.g. before and after, which are then decoded a second time into rows themselves?

ajwerner commented 2 years ago

What I envisioned here was to just keep everything parallel to what we have for JSON. Namely, 3 top-level columns: table, key, value, then, instead of those being json, they'd be emitted as tuple types. In the type system we have a notion of "LabeledTuple" which has underlying named types. I would love to see the types for Updated reflect the tuple type of the underlying table.

https://github.com/cockroachdb/cockroach/blob/cd7e855980cb9ba4c9f8762f55d16ed56cbc5028/pkg/sql/types/types.go#L1128-L1130

Additionally, we have a record type we create synthetically from tables, but I'm not sure if I'd use them. I wouldn't at first. I'd just make the tuple types myself.

https://github.com/cockroachdb/cockroach/blob/cfe853766e2eccec33d2a3279d30d12ad6b75b7a/pkg/sql/catalog/typedesc/table_implicit_record_type.go#L49-L54

On its own, this might be mostly useless, drivers don't as far as I know, make it easy to deal with tuple types. But it could be combined nicely with a cte to select the relevant portions of the row. Consider:

CREATE TABLE t (a INT PRIMARY KEY, b STRING);

Then it'd be cool to do the following:

WITH cf AS (CHANGEFEED FOR t WITH resolved, updated) 
SELECT key.a, value.after.b, value.updated, value.resolved FROM cf

For this, we'd want to make key a type that is just types.MakeLabeledTuple([]types.T{types.Int}, []string{"a"}) (or rather, you'd pull the types and label names from the table descriptor). For the value column, we'd need a few levels of nesting.

Consider the below for a sketch of how to construct the appropriate result column type.

var columnTypes []*types.T =  ...
var columnNames []string = ...
afterType := types.MakeLabeledTuple(columnTypes, columnNames)
valueTypes := []*types.T{afterType, types.Decimal, types.Decimal}
valueLabels := []string{"after", "resolved", "updated"}
valueType := types.MakeLabeledTuple(valueTypes, valueLabels)
valueColumn := colinfo.ResultColumn{Name: "value", Type: valueType, /* ... */}
// ...

Then you'll need the code while running to produce datums of the appropriate types https://github.com/cockroachdb/cockroach/blob/928f605c3e84efb30cab213b47b2f4a66b816701/pkg/sql/sem/tree/datum.go#L3809-L3812.

Hopefully this sketches out the idea. I'm sure there's going to be some interesting details to get the pieces to align. At the end, it should result in a lower-overhead and more generally useful wire format.

miretskiy commented 2 years ago

I think we might have a complication around returning correct header information when planning changfeed. Otherwise, all columns are byte columns, and not tuples. And to get tuple type, you need to get all column types a-priory; this also means schema change policy must be set to 'stop'

ajwerner commented 2 years ago

this also means schema change policy must be set to 'stop'

That's a good point! Once we have a new syntax to represent projections, we could potentially detect whether a re-planning needs to occur. For this use case, I think stop is the right answer.

dvarrazzo commented 2 years ago

I am playing with CHANGEFEED and psycopg 3. I can see:

>>> for row in cur.stream("experimental changefeed for myfeed"):
...     print(cur.description)
...     print(row)
[<Column 'table', type: text (oid: 25)>, <Column 'key', type: bytea (oid: 17)>, <Column 'value', type: bytea (oid: 17)>]
('myfeed', b'[768417200588816385]', b'{"after": {"data": "hello", "id": 768417200588816385}}')

aka the key and value are returned as bytea, which, decoded, contains a json. It would be better to receive objects with jsonb instead.

Do I understand correctly that the issue I describe here is related to this ticket?

Thank you :)

ajwerner commented 2 years ago

That's a separate issue. What you propose seems like the right behavior. This issue is about actually making it possible to get scalar values back out of the changefeed of the appropriate sql type as opposed to the json structure.

dvarrazzo commented 2 years ago

I see, thank you for the explanation, @ajwerner.

shermanCRL commented 1 year ago

Seems like https://github.com/cockroachdb/cockroach/pull/90491 might address this request.

ajwerner commented 1 year ago

Seems like https://github.com/cockroachdb/cockroach/pull/90491 might address this request.

Can you say more? I do think that it's loosely related, but only loosely, and arguably not at all.

shermanCRL commented 1 year ago

My read is that the request here is “don’t make me use JSON, give me a native composite type”. But having discussed it with others (here, internal) I am convinced I don’t quite understand and so will defer. :)

stevekuznetsov commented 1 year ago

@shermanCRL with that new functionality can I get data out of a changefeed without incurring round-tripping compute cost for JSON as an encoder?

ajwerner commented 1 year ago

@stevekuznetsov that's this issue. I believe that the implementation had gotten even more straightforward. @miretskiy expressed some interest in following through.