confluentinc / bottledwater-pg

Change data capture from PostgreSQL into Kafka
http://blog.confluent.io/2015/04/23/bottled-water-real-time-integration-of-postgresql-and-kafka/
Apache License 2.0
2 stars 149 forks source link

Include type when publishing message #54

Open ruimarinho opened 8 years ago

ruimarinho commented 8 years ago

Currently bottledwater-pg does not include the type of row modification that occurred - an insert versus an update. Deletes are also not broadcasted possibly with the last available row representation.

Would you be open to a PR adding this?

ept commented 8 years ago

Hi @ruimarinho, I can see how this would be useful. For updates, people may also want to know the old contents of the row. How do you imagine this would work? Would you add some metadata to messages sent to Kafka?

The difficulty I see here is that this would interact strangely with Kafka's log compaction. That is, if you have log compaction turned on, the message you get for a particular key represents the latest value for that key; prior values for that key may be garbage-collected (Kafka does not retain the full history of all changes to a value). In particular, a deletion (i.e. the new value is non-existent) is represented with a null value, which allows Kafka to discard that key entirely.

If the Kafka messages representing deletions include the old value, you can no longer use the null-value feature of Kafka for log compaction of deletes. Also, including a flag indicating whether a row was inserted or updated seems odd if you cannot access prior versions of the row.

My suggestion would be the following: rather than including additional metadata in messages sent to Kafka, we open up the API such that applications can run a Bottled Water client directly in-process, and thus receive the callbacks for inserts, updates and deletes directly (which would include the old row for updates and deletes, if the table is configured to use "replica identity full").

I discuss this idea (e.g. JNI bindings for Java) in #2, and @gonzalo-bulnes has been exploring Ruby bindings in #43. Would this do what you require? If not, it would be helpful to know a bit more about what you're trying to do.

ruimarinho commented 8 years ago

Thanks for the well though feedback @ept.

For updates, people may also want to know the old contents of the row.

Definitely - even when deleting a row, being able to see the last known representation should be possible. Having access to the row's old values would allow for complex use case where deep diff would be applicable. I imagine this diff would be too much of a performance penalty to calculate before publishing, specially if not all consumers take advantage of the resulting diff. Logical decoding does not work like this anyway, so it'd still need to be a plugin/client implementation.

How do you imagine this would work? Would you add some metadata to messages sent to Kafka?

Yes. This would require nesting the current row under a key such as values with possible metadata in its siblings. For example (ignoring avro's schemas):

{
  "type": "insert",
  "values": {
    "foo": "bar"
  }
}

The difficulty I see here is that this would interact strangely with Kafka's log compaction. That is, if you have log compaction turned on, the message you get for a particular key represents the latest value for that key; prior values for that key may be garbage-collected (Kafka does not retain the full history of all changes to a value). In particular, a deletion (i.e. the new value is non-existent) is represented with a null value, which allows Kafka to discard that key entirely.

This concept is really interesting for actual replicating the database content, as log compaction provides the much needed garbage collection. However, I expect this mode of publishing database "events" as inherently requiring turning off log compaction.

My suggestion would be the following: rather than including additional metadata in messages sent to Kafka, we open up the API such that applications can run a Bottled Water client directly in-process, and thus receive the callbacks for inserts, updates and deletes directly (which would include the old row for updates and deletes, if the table is configured to use "replica identity full").

I have tested several JSON logical decoders and they more or less work. I don't know bottledwater well enough to know if it is a better candidate for this type of work or not. I did notice some issues that it has in avro-encoding common postgres fields like JSON/JSONB, but those are documented issues. What would be the advantages of running bottledwater in-process compared to those decoders (e.g. wal2json)?

I discuss this idea (e.g. JNI bindings for Java) in #2, and @gonzalo-bulnes has been exploring Ruby bindings in #43. Would this do what you require?

In theory, yes. If we're able to receive callbacks directly from bottledwater, I think we could the publishing on our side with our own format. The only downside - which I don't know "who" would be responsible for - is how bottledwater would handle messages consumed from the replication slot but not processed by the client due to an unexpected error (e.g. unreachable Kafka host).

If not, it would be helpful to know a bit more about what you're trying to do.

Connecting microservices using CDC and being able to react to any/all database changes. It's not only important to process the last row's values but also react to changes that have occurred (for instance, analytics).

sigmascord commented 8 years ago

@ept Following (and adding to) Rui's mindset, would it be possible to add parameters to Bottled Water's client so that a list of tables would be passed along to it as a white list, in order to create a 2nd topic for each of the referred tables in that list, where each Insert/Delete/Update operation would be logged in parallel with the new row in the main topic? Like having a table called Audit and 2 corresponding topics in Kafka called Audit and OPS_HISTORY_Audit?

Also, this would be the base to implement an option to make BW an operations logger in addition to the database logger feature, for the whole or only a part of the database or even to split one from the other completely (although).

This, of course:

I believe this would not affect Bottled Water's default purpose/behavior, it would be a nice feature to add to the client and would answer to Rui's request.

ept commented 8 years ago

What would be the advantages of running bottledwater in-process compared to those decoders (e.g. wal2json)?

@ruimarinho The biggest feature of Bottled Water, in my opinion, is that it coordinates the consistent shapshot with the subsequent change stream, so you can get a full copy of a database. Most logical decoding plugins only give you the changes from the time that the replication slot was created, but nothing before. Implementing that was surprisingly fiddly!

how bottledwater would handle messages consumed from the replication slot but not processed by the client due to an unexpected error (e.g. unreachable Kafka host).

The Postgres replication protocol has a facility for the client to acknowledge which events have been durably written out to storage. At the moment, the Bottled Water client only acknowledges events after the Kafka client confirms that they have been published. Thus, if Kafka is unreachable or the client crashes, nothing should be lost (it will simply stall replication until the fault is fixed).

If we build an API for using the BW client in an application process, we'd also expose that acknowledgement facility, so that you can tell Postgres when you have fully processed some events.

Like having a table called Audit and 2 corresponding topics in Kafka called Audit and OPS_HISTORY_Audit?

@scordeiro Yes, that would be possible. My instinct is still is that you'd be better off with an in-process API for the BW client, and that it would be best to prioritise development of the language bindings for a few popular languages. However, if you think separate history topics in Kafka would be the best way forward and you want to implement it, I'd be happy to merge the change.

A suggestion for implementation: BW currently uses an Avro-based wire protocol between the logical decoding plugin and the client (defined in ext/protocol.c). It encodes all the metadata we get from Postgres, such as whether a change is insert/update/delete, transaction IDs, LSNs, and old row values (where available). Currently the client decodes that wire protocol. The simplest solution for a history topic may be to simply send those events straight to Kafka, unmodified. The consumer would have to do some additional work to parse the events (since the actual row data is encoded as a byte string in those events, i.e. as Avro-encoded records inside Avro-encoded records), but it would get full access to all the relevant metadata.