cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
30.11k stars 3.81k forks source link

changefeedccl: consider adding additional sinks via go-cloud/pubsub #36982

Open rolandcrosby opened 5 years ago

rolandcrosby commented 5 years ago

Since we announced CDC support targeting Kafka, we've had a handful of requests to support different message queue systems for CDC: Google Cloud pubsub, NATS.io, AWS Kinesis, RabbitMQ, ActiveMQ, and others. @gedw99 pointed me to the Go CDK pubsub implementation, an extensible module which aims to support a variety of sinks across cloud providers, including many of the ones we've gotten questions about. We'd need to do more analysis to make sure the delivery and ordering guarantees offered by individual sinks are appropriate for CockroachDB, but aside from that overhead this seems like a relatively cheap and painless way to add new sink types.

Note: This issue has been scoped down to focus on the go/cloudpubsub package

Jira issue: CRDB-4467

glerchundi commented 5 years ago

This would be super useful as we think google pubsub would be enough in our microservices infrastructure. Mainly because:

Do you think this is something we could push forward?

lopezator commented 5 years ago

+1 for this

rolandcrosby commented 4 years ago

@danhhz Would you mind taking a look at the GCP pubsub ordering documentation to confirm whether it'd be possible to reliably use pubsub to materialize a table sent through CDC? It doesn't appear that pubsub natively provides any ordering guarantees, but they have a metric that the consumer can monitor to determine how caught-up a pubsub feed is. I think this means that you could buffer data until that metric advances, then reorder the data based on updated/resolved timestamps and process it in strongly-ordered batches. Do you agree? Or are there subtleties that I'm missing that would prevent this from working as expected? (To be clear, this question is specifically about the GCP pubsub product, not the go-cloud pubsub library, which provides an interface to various cloud and self-hosted messaging systems.)

danhhz commented 4 years ago

I think you'd need some way to tie that "how caught-up a pubsub feed is" metric back to our resolved timestamps. Basically, when some consumer sees a resolved timestamp message, it needs a way to wait for every message before that one to have been provably flushed out of the pubsub. To do this with the metric you mention (an age in seconds), we'd need a way for a pubsub consumer to compute now() - a possibly new timestamp on the resolved message vs the pubsub age metric. The tricky part here is that now() and a possibly new timestamp on the resolved message have to be somehow comparable to whatever google clock is computing the age metric. Or am I missing something?

That said, I have two thoughts here.

1) We already send partial progress updates from the nodes that emit row update messages to the single node that emits resolved timestamp messages. If each producer attached its node id, a sequence number, and a per-job-restart session id, then we could plumb the sequence numbers down in the partial progress updates. Then, the resolved timestamp message would include a job session id along with a high-water sequence number per node. If a consumer could prove that it had processed, for every node, every message with a sequence number from 0 to the high-water, then it would have the resolved timestamp guarantee. Unless I'm missing something, this would work in any pubsub that can guarantee as least once delivery (which is many/most of them). The big downside here is that actually proving the 0 to high-water thing involves a lot of distributed state and potentially unbounded buffering.

(Interestingly, all of the above metadata is already now present for the cloud storage sink.)

cc @ajwerner any thoughts on above? ^^

2) One thing that I realize is that if we do ever figure out how to do (1), the way we'd emit the updated row messages would be identical to the naive way (modulo adding new metadata to the messages, which is backward compatible). This means that right now, we could build a google pubsub integration that would give at least once delivery guarantees of all row changes, including timestamps. It's likely that this would even be a vanilla enough use of pubsub that we could do it using a generic interface over pubsubs like is mentioned in the issue. We'd have to document that it intentionally doesn't include resolved timestamps because the pubsubs it targets don't offer the ordering guarantees we need for them, but it sounds like there's demand for this even without that.

ajwerner commented 4 years ago

If a consumer could prove that it had processed, for every node, every message with a sequence number from 0 to the high-water, then it would have the resolved timestamp guarantee.

Sounds right to me.

The big downside here is that actually proving the 0 to high-water thing involves a lot of distributed state and potentially unbounded buffering.

Technically this is true given the complete lack of ordering guarantees. I suspect that in the common case the amount of state which needs to be tracked will be quite small. Most pub sub systems provide some sort of latency expectations. I think it's reasonable to think of these pub sub systems as at least offering something of a synchrony guarantee. I don't expect a message to take 5 days to arrive and then eventually arrive. Maybe a message doesn't arrive at all, that'd be a problem, but that'd be a problem for all of the sinks.

The state tracking is sort of a pain. It effectively boils down to keeping track of the frontier except you probably need to do it in a distributed way. It wouldn't be too bad to do if the client was talking to a scalable, consistent SQL database in the course of processing the message.

I think it's a good idea to play around with implementing the GCP sink on top of the generic interface with a prototype, maybe side-stepping the guarantees on a first pass just to get a feel for it. Then we should add on the necessary metadata to do frontier tracking and verify that we can make it work. I agree with your assessment about the additional metadata that it would take and that we already have it. My guess is while the code is non-trivial the amount of state you practically need to maintain isn't going to be that large in use cases which don't have high throughput.

lopezator commented 4 years ago

Quoting @glerchundi :

We don't require ordering (just a timestamp)

Quoting @danhhz:

We'd have to document that it intentionally doesn't include resolved timestamps because the pubsubs it targets don't offer the ordering guarantees we need for them, but it sounds like there's demand for this even without that.

He's right, there are a lot of usecases where you don't need ordering, just "as least once delivery" guarantee.

Thanks for pushing this forward @rolandcrosby

danhhz commented 4 years ago

Technically this is true given the complete lack of ordering guarantees. I suspect that in the common case the amount of state which needs to be tracked will be quite small.

Yeah, I think you're right that I'm being unfairly pessimistic here. The worst case is unbounded but that's certainly not the common case.

It wouldn't be too bad to do if the client was talking to a scalable, consistent SQL database in the course of processing the message.

I had the same thought! 😆 I guess by definition you do have access to one.

I think it's a good idea to play around with implementing the GCP sink on top of the generic interface with a prototype, maybe side-stepping the guarantees on a first pass just to get a feel for it. Then we should add on the necessary metadata to do frontier tracking and verify that we can make it work.

Agreed, I also think this is the next steps. The library mentioned in the issue even has an in-mem implementation, so maybe it's easy to hook up to testFeed.

lopezator commented 4 years ago

Quoting directly from PubSub documentation:

Typically, Cloud Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages. You can achieve exactly once processing of Cloud Pub/Sub message streams using Cloud Dataflow PubsubIO. PubsubIO de-duplicates messages on custom message identifiers or those assigned by Cloud Pub/Sub. You can also achieve ordered processing with Cloud Dataflow by using the standard sorting APIs of the service. Alternatively, to achieve ordering, the publisher of the topic to which you subscribe can include a sequence token in the message. See Message Ordering for more information.

https://cloud.google.com/pubsub/docs/subscriber

So there would be a couple of options here:

mgoddard commented 4 years ago

Having worked a lot with RabbitMQ in the past, within the Cloud Foundry realm, I'm a big fan and would love to see it as an option here.

camerondavison commented 4 years ago

Not sure if y'all have seen this or not https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector could be a helpful example on how to setup CDC with dataflow. Specifically I'm trying to make sure that I can get the changes that are in cockroachdb into big query

lopezator commented 4 years ago

Any news on this?

We are using EXPERIMENTAL CHANGEFEED FOR and not considering the switch to CREATE CHANGEFEED until this is implemented.

glerchundi commented 4 years ago

I don't know if the proposed solution would vary with the ordering guarantees they provide in Pre-GA.

Just pinging you all so it can be taken into account.

ajwerner commented 4 years ago

Thanks for the update. We can take this into account when we do finish this off.

bobvawter commented 3 years ago

Here's a first draft of a CDC -> PubSub bridge that uses an HTTP feed: https://github.com/bobvawter/cdc-pubsub

stevendanna commented 3 years ago

I'm looking into moving this forward. Below are some notes, many of which just re-iterate ideas presented above.


Google Pub/Sub Ordering Keys

Google Cloud Pub/Sub provides opt-in ordered message delivery (https://cloud.google.com/pubsub/docs/ordering):

  1. You opt into the ordered delivery when creating a subscription:
        sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
                Topic:                 topic,
                AckDeadline:           20 * time.Second,
                EnableMessageOrdering: true,
        })

We'll likely want to think about the UX around topic & subscription creation since a subscription only receives messages sent to a top after the point at which it was created.

  1. For messages to be ordered, the producer must publish messages with the same ordering key and into the same region.

  2. Ordering keys interact with Google's publisher throughput quotas (https://cloud.google.com/pubsub/quotas):

    If messages have ordering keys, the maximum publisher throughput is 1 MB/s per ordering key.

    I believe one can request such limits are increased.

  3. All messages in a given PublishRequest mush share the same ordering key (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage)

  4. Ordering keys can be a maximum of 1024 bytes long:

ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: An ordering key in the request is too long. The length is 1600 characters, but the maximum allowed is 1024. Refer to https://cloud.google.com/pubsub/quotas for more information.

Ordering Concerns

When thinking about ordering for this feature, there are at least two things we might care about:

  1. Can we give the consumer of the pubsub subscription the same ordering guarantees that CHANGEFEEDS are documented as providing? (https://www.cockroachlabs.com/docs/v20.2/stream-data-out-of-cockroachdb-using-changefeeds.html#ordering-guarantees)

    Without ordering keys, I don't believe so. With ordering keys, I believe we can using a number of choices of ordering key.

  2. Can we provide RESOLVED messages that still provide the same guarantee? Can the RESOLVED timestamps still be effectively used by consumers to materialize a consistent view of a table?

The method outlined by @danhhz in https://github.com/cockroachdb/cockroach/issues/36982#issuecomment-555192185 would work without ordering turned on at all. This is nice because (1) it may be that some users want to be able to use the RESOLVED messages but don't want the restrictions that may come with turning on ordered delivery, and (2) it could be reused for other sinks that don't provide ordering.

The summary of that method as I understand:

  1. Attach per-node, per-job sequence number to every emitted row as message metadata (or somewhere in the payload)
  2. The resolved message would then include the list of all per-node-per-job sequence numbers that must be reached for the resolved timestamp to be valid.
  3. The client would have to track the sequence numbers it has seen. When it received a RESOLVED timestamp it would either (a) know it is already valid, (b) have to continue to process messages (buffering any messages with sequence numbers beyond those included in the resolved message) it has seen all of the messages.

If we wanted to rely on ordered delivery, I believe we would need to send a RESOLVED message for every ordering key we are using, like we do for kafka partitions. We would likely want to provide some option for how many "ordering keys" we shard rows over.

ajwerner commented 3 years ago

I am fully on board with not doing anything related to ordering at this point. Adding ordering would be purely additive if we have testing and support without it. Adding it can also be a very small change where we'd do something like set the hash of the row's key as the ordering key.

ajwerner commented 3 years ago

FWIW the per-row ordering guarantee can be very useful even in the absense of an easier to use resolved timestamp. Imagine you're synchronizing rows to some other system, like, say, elasticsearch, you can do an atomic CAS on a value using a version integer. I'd do something like read the existing row and then insert the new one if the timestamp on my changefeed event is higher than the one that's in there. In that world, I'd get correctness out of per-key ordering without ever needing to think about a resolved timestamp.

stevendanna commented 3 years ago

Adding it can also be a very small change where we'd do something like set the hash of the row's key as the ordering key. ... FWIW the per-row ordering guarantee can be very useful even in the absense of an easier to use resolved timestamp.

+1 I am starting with some per-row ordering and going from there.

miretskiy commented 1 year ago

@amruss this one is a bit old... Should we close this issue?