confluentinc / bottledwater-pg

Change data capture from PostgreSQL into Kafka
http://blog.confluent.io/2015/04/23/bottled-water-real-time-integration-of-postgresql-and-kafka/
Apache License 2.0
3 stars 149 forks source link

Support table and column blacklisting #21

Open xstevens opened 9 years ago

xstevens commented 9 years ago

A feature that's useful to financial institutions (probably others too) is to have a configurable blacklist of tables and/or columns that you don't want sent to Kafka. This is generally because the data is considered sensitive information and most likely has strict compliance requirements, but it could also just be that it isn't useful for analytical purposes.

I'm not aware that this is achievable with Postgres itself, but I'm open to that solution as well if it exists.

ept commented 9 years ago

Yes, this would be a good feature to add to Bottled Water. I think we'd have to do it in the Bottled Water extension. Pull requests welcome :)

bchazalet commented 8 years ago

Being able to choose the tables would be awesome indeed!

samstokes commented 8 years ago

One extra benefit from doing this: if you're using BW with a database generated using ActiveRecord migrations, then you have to run with --allow-unkeyed, since the schema_migrations table that ActiveRecord uses to track migration state does not have a primary key.

It's hard to see a use case for replicating schema_migrations to Kafka, so you could just blacklist it and run with the default mode.

bchazalet commented 8 years ago

@ept say I want to implement whitelisting or blacklisting of table names. For the snapshot part, I think I can filter at get_table_list by giving it the right table pattern. But I don't see where I would filter the right tables in the logdecoder part. Do you think you could give me a hint?

In particular, I am confused about the transaction begin/commit functions (output_avro_begin_txn and output_avro_commit_txn): do those need to filter consistently too or you think updating the output_avro_change function would be enough?

ept commented 8 years ago

@bchazalet Would be great if you want to try making a patch.

For snapshot: yes, get_table_list is the place to go. It already has rudimentary filtering support via the table_pattern parameter, which currently snapshot_start hard-codes to be %, but could be exposed as a command-line parameter. But it might be better to instead allow an explicit whitelist or blacklist of table names (or table name patterns with wildcards).

For log decoding: I think you only need to update output_avro_change. If you have a transaction that only modifies tables whose changes you're filtering out, you'll get a transaction-begin event followed by commit event with nothing in between, but that's ok. (That pattern already occurs on DDL transactions, which produce begin and commit events, but no data change events.)

To filter on tables, I would suggest first translating the whitelist/blacklist into a list of Oids identifying the tables to be included/excluded, by querying the catalog from the client (similar to what get_table_list does). Then you should be able to pass that list of Oids to the logical replication plugin by passing options to the START_REPLICATION command. You should be able to pick up those options in output_avro_startup and add them to the plugin state. Might need to look at the PG source to figure out exactly how those options get passed around, as I don't think the docs say.

Let me know if I can help further!

samstokes commented 8 years ago

As a workaround, if your Kafka cluster is configured to not automatically create topics (auto.create.topics.enable=false), you can achieve the same goal by manually creating Kafka topics for only those tables that you want to sync, and then running bottledwater with the --on-error=log command-line flag. Bottled Water should ignore updates to the other tables because it can't produce to the corresponding topics. (--on-error=log is needed because otherwise Bottled Water's default policy is to stop the sync and exit on error.)

(The keen-eyed will notice that this workaround actually simulates a whitelist, not a blacklist :))