cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
30.11k stars 3.81k forks source link

roadmap: Support logical replication / CDC #2656

Closed maximecaron closed 5 years ago

maximecaron commented 9 years ago

My understanding is that CockroachDB is currently using Shadow paging for transaction. Postgresql have a feature called Logical Decoding http://www.postgresql.org/docs/9.4/static/logicaldecoding.html . But this feature is using the write-ahead log. Would it be possible for CockroachDB to also support logical replication without using a write-ahead log?

petermattis commented 9 years ago

@maximecaron CockroachDB uses MVCC for transactions. Shadow paging, as defined by wikipedia, is a different technique. Perhaps you have a different definition of shadow paging in mind.

CockroachDB has builtin replication. There are no current plans to support a logical stream of database modifications. Perhaps if you describe what you'd like to do with this stream (the high level goal) we can figure out what the equivalent in CockroachDB would be.

maximecaron commented 9 years ago

So the goal is to have another slave system (Cache Server or ElasticSearch) consume a stream of changes. For the case of ElasticSearch this would enable having data asynchronously indexed without polling the database intermittently to retrieve any new or updated data.

For a CacheServer this would let the cache server consume stream of key that need invalidation. Facebook is already doing something like this with Mysql and McSqueal. Invalidation pipeline

There is several case where this could be useful
Kafka

It seem to be possible to implement something like this with a trigger mechanism but I would prefer something more like a logical replication stream.

spencerkimball commented 9 years ago

Thank you for the explanation of use cases. This isn't something we currently support, though it's not hard to see us providing equivalent functionality in the future. Making the stream of events both consistent and ordered presents some challenges for Cockroach, so this is far from a trivial feature. The problem becomes more tractable if a Cockroach cluster is allowed to output N different update logs, where N is the number of active nodes in the cluster, and some external process be responsible for merging the output appropriately.

Something to keep in mind. Would you care to create an issue so we can track this feature request?

On Thu, Sep 24, 2015 at 5:09 PM, Maxime Caron notifications@github.com wrote:

So the goal is to have another slave system (Cache Server or ElasticSearch) consume a stream of changes. For the case of ElasticSearch this would enable having data asynchronously indexed without polling the database intermittently to retrieve any new or updated data.

For a CacheServer this would let the cache server consume stream of key that need invalidation. Facebook is already doing something like this with Mysql and McSqueal. [image: Invalidation pipeline] https://camo.githubusercontent.com/d58566d3ad609df7fc947bb88d6dce3eee74e184/687474703a2f2f7374617469632e6f736368696e612e6e65742f75706c6f6164732f73706163652f323031332f303432312f3134343932365f787336465f3135393332382e6a7067

There is several case where this could be useful

[image: Kafka] https://camo.githubusercontent.com/8c9538dba20efcfd63cfe8dad0d516de2e82ff85/68747470733a2f2f6d617274696e2e6b6c6570706d616e6e2e636f6d2f323031352f30342f626f74746c656477617465722d30322e706e67

It seem to be possible to implement something like this with a trigger mechanism but I would prefer something more like a logical replication stream.

— Reply to this email directly or view it on GitHub https://github.com/cockroachdb/cockroach/issues/2656#issuecomment-143051691 .

petermattis commented 9 years ago

@maximecaron Thanks for the pretty diagrams. I can see the utility in exporting a logical stream to ElasticSearch for indexing. Is the purpose of the caching to offload read-only traffic from the database? If yes, the system might be simpler by repurposing the cache nodes for cockroach nodes. This would expand the cockroach capacity and remove any delays between updates to the database and the read-only caches. Such an approach isn't always possible or there might be another purpose to the caching. But given CockroachDB's native scalability it might not be necessary to use caching to workaround database scalability bottlenecks.

maximecaron commented 9 years ago

I am mostly interested in having asynchronous index built in a consistent way. I don't have a design yet but I am currently thinking about one and would be happy to try implementing this feature if you already have a clear idea on how this should work.

When I have a better idea of how this should be implemented I will create issue for the feature request. In the meantime feel free to close this issue.

spencerkimball commented 9 years ago

I think we're going to be supporting asynchronous indexes internally. This would be very useful when maintaining a global full text index, for example. In an ideal world, would you prefer the indexes are supported natively, or do you need to connect cockroach to some kind of external indexer?

On Mon, Sep 28, 2015 at 2:10 PM, Maxime Caron notifications@github.com wrote:

I am mostly interested in having asynchronous index built in a consistent way. I don't have a design yet but I am currently thinking about one and would be happy to try implementing this feature one if you already have a clear idea on how this should work.

When I have a better idea of how this should be implemented I will create issue for the feature request. In the meantime feel free to close this issue.

— Reply to this email directly or view it on GitHub https://github.com/cockroachdb/cockroach/issues/2656#issuecomment-143830503 .

maximecaron commented 9 years ago

I would prefer to connect cockroach to some external indexer. The main idea is to use Cockroach to scale write and then use an external system to perform incremental (materialized view/index) update. So far I have tried to use a key range to implement a queue and append to the queue in the same transaction. But since keys from the queue are contiguous they all go to the same raft cell and limit the write throughput.

petermattis commented 9 years ago

@maximecaron Do you need to lookup the data in Cockroach, or are you looking to use Cockroach purely as a write queue? As you've noticed, there are some challenges to having a queue in a distributed system. If you have multiple writers writing to a queue, you either have to have a synchronization point on the ids assigned to the queue entries (which will limit scalability) or you have to give up a nice monotonically incrementing key. If you can get away with a "mostly sorted" queue, the experimental_unique_int() function might give you what you need. You'd use a table with a schema like:

CREATE TABLE queue (
  id INT DEFAULT experimental_unique_id(),
  value BYTES,
)

The challenge with this structure is that when you read from the "end" of the queue there is a small window of time where new values could be inserted before the "end". And you still have the problem with contiguous keys in the queue limiting the write throughput. The obvious fix to this problem is to make multiple queues. Not sure if that works for your system.

maximecaron commented 9 years ago

Actually having one queue per raft cell would work as long as a consumer reading from several queue is able to merge-sort the transaction back into one sequence. Ex:

If Keys A, B and C are in distinct Raft cell. And we have transaction T1: write to key A and key B then transaction T2 write to key B and key C.

A consumer reading from the 3 raft cells should be able to order the log entry for A B and C like below and create a index that contain entry for A,B and C that is consistent (have all of T1 changes but none of T2 changes) or (have all of T1 changes and all of T2 changes)

A updated by T1 B updated by T1 B updated by T2 C updated by T2

I think all that is needed for this is a way to write the transaction ID inside the queue.

petermattis commented 9 years ago

@maximecaron I might have lost the thread of this discussion. Are you proposing a general purpose mechanism for Cockroach to export a logical stream of changes? Having a queue per range (each range is a raft consensus group) would likely be unworkable. Ranges are ~64MB in size. There is an expectation that there will be hundreds of thousands or millions of ranges in a cluster. That is a lot of queues to merge-sort back into a single sequence.

maximecaron commented 9 years ago

Yes, this is exactly what I was proposing. Now that you say mention it, it's now clear having one queue per range is not practical. But having a (log/queue) per node might work. However, If we are not doing the log merge, changes from transaction touching multiple Node will result in separate logs that can arrive at the indexer at different times. One way to fix it is to also store a copy of all transaction that are touching several node in a global log.

Is that looking more realistic to you?

petermattis commented 9 years ago

I'm not clear on how the queue per node approach would work. Note that ranges (64MB contiguous chunks of the KV space) have replicas and those replicas are spread across nodes. But take any two nodes and they will have different sets of ranges. That seems to prohibit a per node queue approach, but perhaps I just haven't thought about it enough.

@spencerkimball, @bdarnell and @tschottdorf should all have better input on possibilities for supporting logical replication stream.

One thought I had was that perhaps you don't need a mechanism that operates at the KV transaction layer. If you're going through the SQL layer we could possibly mark a table as supporting an update stream. Not sure if this simplifies the problem or even if it would be workable in your desired use case. Do you have a specific target application use case in mind? If yes, do you mind describing it? Your diagrams above were useful, but they don't mention doing reads out of the DB other than for sending along to indexing and other downstream services. I'm guessing that isn't the full story

bdarnell commented 9 years ago

A queue per node isn't feasible, but when/if we start using copysets, a queue per copyset would be more manageable.

changes from transaction touching multiple Node will result in separate logs that can arrive at the indexer at different times.

Cockroach specifically avoids gathering information about all the keys touched in a transaction in one place, so I don't think there's any way to provide this level of consistency without creating a separate log at the application level.

In the long run, Cockroach will support more kinds of indexing including full-text and geospatial. When that functionality exists, would there still be a need to provide a replication stream to external indexers? It may be easier to push forward on alternate indexing schemes than to support a general-purpose external mutation log (at least when consistency is needed. A relaxed-consistency log would be much easier to support).

maximecaron commented 9 years ago

This is my main usecase. We have a graph of friend that follow each other. If we have user A B and C that are friend and we have that sequence of event. 1- User A unfriend user B. 2- User B unfriend user C. And we have a materialized view that store the feed for each user. We need to make sure that user A cannot see that user B unfriended user C.

If we do a full refresh of the materialized view inside a transaction this would work but refresh wont be real time. I was trying to apply the update to the materialized view in an incremental way and in the background.

My current solution is to have a queue of changes per user at the application layer and this is working well so far.

maximecaron commented 8 years ago

Any plan to add trigger at the KV or SQL layer?

petermattis commented 8 years ago

There are no near term plans to add support for triggers. Are you thinking you could use triggers to implement logical replication? Can you describe in more detail how you would like triggers to work?

rhauch commented 8 years ago

I'm also very interested in being able to capture committed changes to rows. Basically, the scenario is this: a "monitor" (see below) connects to the database, performs a consistent snapshot at some known point in time, and when the snapshot is complete it then starts capturing the committed changes starting with the same point in time the snapshot was performed. The monitor continues to monitor the stream of changes.

Using asynchronous change data capture (CDC) allows multiple downstream apps and services to consume the continuous and never-ending stream(s) of changes to databases. Each stream must be consistent and totally ordered, and could be used for replication, updating derived data sources (perhaps optimized for different domains and scenarios), caches, metrics, analytics, multi-system integration, etc.

Every database exposes their changes differently -- and some not at all. So it's usually very complicated to just read a consistent stream of changes from a DBMS server/cluster. And, doing this while never losing events is even more difficult. The Debezium project is a relatively new open source project whose goal is to capture these changes into durable, totally-ordered, persistent streams so that apps and services can easily consume them (or replay them from the beginning) without having to know how or where the streams originate.

We have a connector for MySQL, are working on a MongoDB connector, and plan connectors for PostgreSQL, Oracle, SQL Server, and many others. Would love to be able to create a connector for CockroachDB.

Cockroach has the ability to obtain consistent snapshots, but appears to be missing the ability to expose a log/stream of committed changes.

rhauch commented 8 years ago

While it would be very convenient and hopefully a long term goal, it certainly isn't necessary to start out by providing a simple way for clients to consume a single change log of events for a distributed database. It'd be a start, for example, to provide access to multiple logs (e.g., perhaps one on each node in the cluster) -- clients could at least work with that, though they may have to resolve the different logs into a single serializable stream, if that's what they require.

Whether there are one logical log that contains all events, or multiple logs that together contain all of the information about the changes, the system would still need to retain logs for a period of time or until the log reaches a maximum size. At that point, older entries can be dropped.

A client could then connect and request that it receive the log starting at some position. As long as each event includes its position, the client can consume the events and keep track of the position of the last event it has completely processed. If the client disconnects, stops consuming for any reason (e.g., garbage collection), is taken down for maintenance, then when it reconnects it can start reading the log where it last left off.

Because the logs contain only recent history, a change data capture system would likely need to start out with a database by performing a consistent snapshot, and when complete start reading the log(s) starting with the first event after the snapshot time. Thus there needs to be a way to correlate the log position with the time travel snapshot time.

(Contrast this with a listener-based approach, where the database is expected to send a notification to one or more remote listeners. What happens in this case when a remote listener is not available? The database might enqueue the event, but then for how long? The database could drop the event, although that defeats the purpose of this feature. The database could use it as back pressure, although that's a horrible thought. Maintaining total order requires the remote listener return an acknowledgement, but what happens if that is not received? Should the database resend the event? How many times? You can see that this approach quickly becomes intractable.)

petermattis commented 8 years ago

@rhauch Thanks for your thoughts. A complexity with providing some sort of feed or log of changes in a CockroachDB is that not only is there not a single log, there are an unbounded number of logs. Specifically, transactions are allowed to span arbitrary "ranges" where each range is ~64MB in size. Each range has a log of KV operations performed upon it and as the ranges grow they split. Ignoring the complication that the range logs are GC'd fairly aggressively right now, how would a client keep track of the range logs it needs to poll given the dynamic splitting of ranges and their automatic movement around a cluster?

Providing some sort of logical replication, change feed or change log mechanism seems desirable, but I don't have a solid idea for how to achieve it yet. Putting aside the implementation problem, at what level would you want to receive events? Notification of any change to a particular table? Notification of a change to any table? Would you want the lower-level KV data or the SQL rows that were modified?

rhauch commented 8 years ago

Putting aside the implementation problem, at what level would you want to receive events? Notification of any change to a particular table? Notification of a change to any table? Would you want the lower-level KV data or the SQL rows that were modified?

This is a bit easier to address. If a logical change stream mechanism is targeted for clients, then it's likely to be more easily used if the mechanism expose the events at the SQL row level, since that's also the level at which they are logically/conceptually operating. (There may be value in composing events at the lower-level KV changes, such as providing continual updates for backups, etc. Given the current replication capability, maybe this isn't too big a deal.)

And, unless the implementation needs to, it shouldn't filter for the client. Instead, give the client a firehose and expect the client to find what is interesting in the stream. (DBMS log consumers typically require specific privileges to read the log.) It's also reasonable to expect a small number of clients, since each client will almost certainly induces a load on the system.

Ignoring the complication that the range logs are GC'd fairly aggressively right now, how would a client keep track of the range logs it needs to poll given the dynamic splitting of ranges and their automatic movement around a cluster?

Might it make sense for each node to have it's own log, perhaps including the changes for transactions spanning multiple ranges? The log would include whatever changes for each transaction processed by that node. IIUC, as ranges are GC'd and moved around, the log might simply stop including the changes for any ranges that are no longer on that node.

If that makes sense, then a client (or some imaginary Cockroach service) needs to be able to monitor all the logs and coalesce them into a cohesive logical log. That might be less complicated and more stable than tracking the range dynamics.

An alternative approach would be to have each node have a thread (or separate process) that scans that node's (append-only) log and sends the events to an external distributed log, such as Kafka. All events related to a single database might go to a single Kafka topic, or all events for the entire map might go to a single topic. I'd have to learn more about how CockroachDB captures the work associated with its transactions to figure out if there is sufficient info in the events for consumers to reconstruct the whole transactions (and know they have everything associated with a transaction, despite various failures). The nice thing about this approach is that each node is responsible for sending its log to the external distributed log (i.e., Cockroach makes the information available), but aggregation of that distributed log could be left for future work or simply left to the consumers.

I know this is a lot of hand waiving, but perhaps it helps.

petermattis commented 8 years ago

Each node does maintain a log of KV operations, but this log is also GC'd fairly aggressively. We also need to be aware of the complexity that replication causes. Replication is performed at the range level, not the node level, so a naive stream of KV operations from each node would contain 3 copies of each operation. Translating the KV operations back to SQL operations is possible, though a bit of a headache.

I feel a bit pessimistic thinking about this, but it is worth brainstorming about. Here's a more concrete proposal: when a transaction is perform KV mutations it has the node performing the mutation send a Kafka topic tagged by the transaction ID. When the transaction commits or aborts, we send that event to the Kafka topic. A consumer of the topic would see a series of operations for the transaction (interleaved with operations for other transactions) and then a commit or abort. If using a single Kafka topic is a bottleneck, we could shard the topics based on the transaction ID. It's probably also possible to do this only for committed transactions by piggy-backing on intent resolution. Either way, I'm not sure this would guarantee that the Kafka log would contain transactions in commit order. That is, transaction A could commit before transaction B by end up in the Kafka log after transaction B. That seems undesirable.

rhauch commented 8 years ago

@petermattis, I understand other features have priority, so thanks for spending the time to think about and discuss this.

Are the KV changes applied by each node committed, or might they also include some intents? If committed, then capturing the KV changes is interesting. First, having 3 copies of each operation is actually not a bad thing, since it always can be filtered out later. Secondly, with this approach the KV operations map very nicely to Kafka messages (the message key is K, the message value includes K,V, and transaction metadata) written to a single topic that can be partitioned by K. (In Kafka, every topic partition is actually a separate totally ordered log, and you can have large numbers of partitions in each topic. But it would also be possible to shard across multiple topics, too.) Also, the topic partitions can be set up with log compaction, whereby Kafka can remove old entries from the log when newer entries exist with the same key. (This is another reason why having 3 copies with the same K but different values is okay, since log compaction can remove all but the last one.) Even if the topics use log retention (keep messages for a max duration), just take the extra copies into account when configuring retention. Order would be maintained on the topics since the messages are partitioned by K. Translation from KV into SQL operations could be done by a downstream service, but IIUC may even be a stateless conversion which means it could be partitioned across n service instances. However, if the translation from KV operations into SQL operations is difficult or might change, then I could understand avoiding this approach. I guess it comes down to weighing that difficulty vs the apparent operational simplicity of this approach.

Recording the transaction operations in a Kafka topic should be possible as well. Using the transaction ID as a key would work, though that means that multiple messages would have the same key, and so Kafka logs must use retention rather than log compaction. This is okay, but it now requires that a downstream service/consumer has to be running to translate these into some other form (e.g., using SQL primary keys as the message key). Also, the topic could be partitioned by transaction ID, meaning that all events for a single transaction go to the same partition. IIUC, transaction ID is primarily based upon time (with a random priority), special partitioning logic would be required to ensure continual distribution across all partitions over time. Doable, but something to consider. Kafka will not maintain order across partitions, so that would also have to be handled by the downstream service and would require a fair amount of state.

petermattis commented 8 years ago

The KV changes applied by each node are "intents". Those "intents" do not become visible until the transaction is committed, which can occur on a different node.

Yes, the replicas of each operation can be filtered, but sending 3 times as much data will be a performance hit.

Transaction IDs are random (v4 UUIDs).

glerchundi commented 7 years ago

We're evaluating cockroachdb as our primary data store for a new globally scalable product. Having a CDC would be a huge win to feed search & cache engines.

Any plan to implement something like this?

spencerkimball commented 7 years ago

@glerchundi, sounds like an interesting use case; we'd like to hear more. We're very much intending to build this feature; the question is really about timing. @dianasaur323 can give you more details.

dianasaur323 commented 7 years ago

@glerchundi we are working on scoping it out, and it would be great if we could take this offline so that I can hear more about your use case so that we build something that meets your needs. What is the best way for me to connect with you?

glerchundi commented 7 years ago

Great, glad to hear that. By mail in glertxundi at gmail.com.

Thanks in advance!

On Tue, 10 Oct 2017 at 22:20, Diana Hsieh notifications@github.com wrote:

@glerchundi https://github.com/glerchundi we are working on scoping it out, and it would be great if we could take this offline so that I can hear more about your use case so that we build something that meets your needs. What is the best way for me to connect with you?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/cockroachdb/cockroach/issues/2656#issuecomment-335595395, or mute the thread https://github.com/notifications/unsubscribe-auth/ACIPljLZsq1xDMK9SZIpLwGMfJ6y2Y1Yks5sq9GSgaJpZM4GDUHp .

dianasaur323 commented 7 years ago

Excellent - just sent you an email. Looking forward to connecting.

csdigi commented 7 years ago

Just to add another data point, this is a feature which currently blocks our use of cockroach (vs a Postgres instance). We subscribe to the WAL and use it to automatically submit a protocol buffer format of table changes onto a series of Kafka topics which will eventually be processed in an analytics pipeline (using Apache Beam). We also process these messages to trigger offline jobs (for example malware scanning of documents after they are written, or running a series of ML classifiers on data).

We currently use a some custom binaries which handle our WAL subscriptions and forward relevant tables to processors, so we would not be against receiving several logical streams (as long as we are clear on the guarantees + strong consistency of them).

dianasaur323 commented 7 years ago

@csdigi Thanks for the additional color. It would be great to collect some requirements from you - namely, do you need things to be strictly ordered, do you need support for transactions, do you need support for cross-table transactions (I don't think anyone really conceivably supports this yet), and what latency threshold is acceptable to you?

Regarding your comment about receiving several logical streams, I'm assuming you mean that you would be okay with getting an ordered log from each node versus a combined log from the entire cluster. The problem with that is that I'm not sure how useful it would be to you, since I don't believe you would get the ordering guarantees and consistency levels that you would want. I may be wrong here though, so someone else might jump in to correct me.

andrew-bickerton commented 6 years ago

@dianasaur323, similar to the other posters we also have a requirement for some kind of changelog/transactionlog/feed of mutations.

To help continue the discussion on what that might mean.

For us, I don't mind if there are several changelogs to consume/interrogate, but rather than it being a changelog per node (I agree with your point around consistency), it needs to more be about the atomicity of the transactions that need to be maintained. For a traditional RDBMS this of course means a single transaction log file with some form of ordering (LSN), which is easier to do as you're dealing with a single master node, this does however make it challenging to guarantee consistency as the change may have relied implicitly on a change already implemented on another node (how to expose this dependency through a changelog?)

Given that in cockroachdb there is a leader per range of data, could each leader be exposing their log file in a consumable form? In terms of latency, we usually measure this in the low seconds, if we have apps that need to consume a feed with lower latency than that, we look to either get a direct feed from the consumer or setup a db structure that can be consumed from the DB directly (polling an audit table for instance).

As to why we need this, as the existing database solutions we employ all offer some form of changelog we have used them to add asynchronous feeds to downstream systems without needing to do any changes to the producing system (and without affecting their stability/performance). Some of those downstreams systems include: fraud detection (processing of changes), reporting systems (transforming the data into different structures with different indexing requirements as well), data science processing (for analysis/feedback mechanisms).

Hope that helps expand your understanding, happy to take this conversation offline.

csdigi commented 6 years ago

@dianasaur323 Sorry about the delay on getting back to you on this.

Regarding the multiple streams, I think we would be fine in getting an ordered log from each range leader (rather than per node perhaps). A consistent view of cross table / range transactions would be required, however this can also be in the format of several messages from the range leaders as long as they all have some form of consistent transaction logical timestamp to tie them together. Currently the WAL from Postgres does give a view of all tables changed in a transaction commit (although their problem is much simpler as all the data is local).

Our current design considers a single WAL slot, and multiplexes that to a series of processors (to create structured protobuf messages from consumed changes). The WAL cursor is only progressed on this stream when the downstream Kafka topic has confirmed the write.

It would not be out of the question to imagine processing multiple replica logs from range leaders (which are guaranteeing their output to be ordered by commit logical timestamp in the range) and progressing a cursor per-range when we managed to write the changes to a set of Kafka topics which represent higher level constructs (such as tables).

Obviously it would take care for the implementation to merge join the incoming streams (per logical table) so that it had consumed a commit of at least the tx logical timestamp from each (and perhaps a heartbeat stream of the latest known logical timestamp in the cluster for infrequent-write workloads). The ambition would be to read from the replica log for all relevant ranges (for some definition of relevant that would allow some distribution on the processing of unrelated tables), and ensure we had read all relevant data before outputting an event and moving the cursors of all relevant ranges forward (transactionally) and processing the next highest commit timestamp.

Obviously there is a lot of complexity for what happens when ranges come and go, and how to hold the range replica log cursors (probably in cockroach tables themselves given the transactional nature of the changes), and using these cursors to garbage collect replicated information. Also handling the creation of values for new ranges as they are created / split. I am not sure what timestamps you would have access to for cross-range transactions (which for example update multiple rows in multiple ranges).

dianasaur323 commented 6 years ago

@andrew-bickerton Thanks for the feedback! It would be great to catch up offline to run our current thoughts around implementation by you. What would be the best way to connect? I'm a bit curious to hear what trade-offs you've made with your existing architecture.

Regarding tracking changes on a leader basis, things get complicated once transactions run across multiple ranges that might have different nodes as leaders. That being said, we are thinking of a cluster-wide transaction log, although we have to consider the performance implications of that.

@csdigi Okay, the detail there is helpful. It's interesting to hear how you leverage cursors to manage delivery to Kafka. I think having to reconstruct transactions from multiple logs sounds like a lot of work, and in our implementation of CDC, we will probably try to do better (or at least do some of the work for you). It sounds like you are very comfortable with developing workarounds to get to where you want. Do you prefer that flexibility, or is it just that there is no better solution out there?

andrew-bickerton commented 6 years ago

@dianasaur323 I've just followed you on the twitter (@coder4hire) if you follow back I'll DM you my direct contact details.

Given that cockroachDB already handles its cross leader transaction changes and has an internal log of those changes (from my rough understanding), could you not have a second consumer of those logs that consolidates them and outputs a single stream of changes? (that way avoiding direct impact as downstream consumers are inherently going to be asynchronous)

dianasaur323 commented 6 years ago

@andrew-bickerton awesome! just did so, sorry for taking a while to get back to this issue.

Yes, that is probably the approach we will be taking. Essentially having a log at a range level, and then creating another layer to process that. However, there are performance impacts to that, so we have to be careful in terms of how we make certain design decisions. We are leaning in on working on this though, so there should be progress over the next couple months for you all to follow.

wingedpig commented 6 years ago

Adding a comment to subscribe to this topic, but also to say that we'd need this as well. We currently use Postgres, and logical stream changes into an Elasticsearch cluster. This works really well for us. We often need to regenerate our ES indexes, for various reasons. It's easy for us to take a snapshot of the database, index it, and then continuously stream all the changes since the snapshot.

dianasaur323 commented 6 years ago

@wingedpig thanks for your interest! you should see updates on this area soon as it is currently being scoped out in preparation for starting development.

rhzs commented 6 years ago

I have the same view with @wingedpig will be awesome, if CockroachDB has API to expose or stream their internal logs.

dianasaur323 commented 6 years ago

thanks @incubus8 for the feedback. I will update this chain with high-level updates as we decide upon them for your information and comments.

skunkwerk commented 6 years ago

+1 Thank you for making this part of the 2.1 milestone. Change data capture is essential to my application, both for syncing changes to other applications and for potentially helping with full-text search (sending changes to ElasticSearch) as an alternative to build-in search (ticket https://github.com/cockroachdb/cockroach/issues/7821).

dianasaur323 commented 6 years ago

@skunkwerk I'm glad you're looking forward to this feature - we are too! If you'd like, and have time to invest in this, would you be open to connecting with us to provide feedback as we build things out? We are currently prototyping and refining scope for 2.1.

lukesteensen commented 6 years ago

@dianasaur323 I'd be interested in providing feedback if you're looking for volunteers. My team is in the middle of implementing the second generation of our Postgres-based CDC system and I'd love to see support in CRDB.

dianasaur323 commented 6 years ago

@lukesteensen we are looking for volunteers! I would love to connect with you. My email is diana at cockroachlabs dot com, although if you give me the best way to contact you, that works as well.

awwx commented 6 years ago

@dianasaur323 I'll be interested in reviewing the high level RFC when that's available :)

dianasaur323 commented 6 years ago

@awwx of course! we have some dated RFCs that explore this idea, but now that we've gotten around to prototyping it, the RFC will likely look very different. I'll link to it here when it's in a form that is shareable.

msample commented 6 years ago

I too find the CDC pattern in Kafka topic(s) fed by a DB's transaction log valuable. Tungsten on MySQL feeding change events into a Kafka topic works quite well.

WRT to the distributed nature of CRDB making this more challenging than for single-master DBs, I wonder if Kafka's KSQL Transaction protocol would provide a good foundation for CRDB nodes to write to multiple different Kafka topic partitions and eventually send the commit or abort in-band? IIRC consumers can be configured to buffer events until the associated txn's commit or abort comes through. In Kafka there is one consumer/producer process that owns a transaction - hopefully using the same txnId across CRDB nodes would be sufficient.

https://www.confluent.io/blog/transactions-apache-kafka/

ligustah commented 6 years ago

I don't think tightly coupling this functionality to Kafka is the best way to go. On my previous project we built a CDC solution for AWS RDS which relied on Kinesis instead, for example.

glerchundi commented 6 years ago

Completely agree with @ligustah.

dianasaur323 commented 6 years ago

@msample thanks for your feedback - we are currently thinking of making the first iteration utilize Kafka as a first step, since Kafka provides some nice abstractions that we could take advantage of.

That being said, @ligustah and @glerchundi, we aren't aiming for tight coupling. What would be helpful is for you to let us know (a) what data streaming solution you use and need compatibility for (b) what downstream applications they support and (c) what guarantees you need from the data stream to support your downstream applications.

glerchundi commented 6 years ago

That being said, @ligustah and @glerchundi, we aren't aiming for tight coupling. What would be helpful is for you to let us know (a) what data streaming solution you use and need compatibility for (b) what downstream applications they support and (c) what guarantees you need from the data stream to support your downstream applications.

@dianasaur323 we have not decided what we're going to use because it'll be an ongoing investigation during this year. Kafka seems like too much operational cost but maybe its worth it. Maybe redis-streams, some managed service, who knows?

That said, one of our downstream apps will be microservices that will consume other microservices data and publish them as agreggated information. As a very simple example, imagine User, AccessPoint (with its display_name) and user accesses represented as User-AccessPoint relationship. Our end user (service consumer) wants to see preliminary information of the relationships before going to the access point details; i.e, user "gorka" has access to these access points and their display_names are these: hall, meeting room (this display_names will be consumed via CDC and updated in the corresponding relationship row(s)).

Also, cache and search engines will be feed from this CDC of course.