snowplow / snowbridge

For replicating streams across clouds, accounts and regions
Other
15 stars 7 forks source link

Release/1.0.0 #143

Closed colmsnowplow closed 2 years ago

colmsnowplow commented 2 years ago

v1 Release PR

Creating this now in order to record a 'snag list' of things we need to fix before release.

TODO:

colmsnowplow commented 2 years ago

Two commits don't have related issues, these need to be created and added before release.

colmsnowplow commented 2 years ago

I've just realised #140 should also have removed cmd/serverless.go.

colmsnowplow commented 2 years ago

Authorship of Add ability to configure TLS for Kafka and HTTP changed to me in the rebase - this isn't accurate as this work was entirely @TiganeteaRobert's work. In the rebase, I'll edit the authorship to correct that.

colmsnowplow commented 2 years ago

@istreeter bear with me as a few of those questions require a bit of effort to answer properly. I'll pop in my answers to the easier ones first though. :)

colmsnowplow commented 2 years ago

Acking messages out-of-order. For example, acking filtered messages before acking unfiltered. In KCL this would definitely be wrong, because you would accidentally checkpoint records that hadn't been safely processed yet. I would like to understand more about how Kinsumer handles checkpointing under-the-hood and whether this is safe to do.

This is handled in our fork of kinsumer.

There's an in-memory checkpointer state for each shard, which keeps track of the latest checkpointed sequence number. When the checkpointers commit function is called, it updates DDB with the latest sequence number. This is called periodically on a ticker.

With 'manual checkpointing', kinsumer returns an updatefunc, which is responsible for updating the latest processed sequence number. When we do so, we don't update the sequence number until the previous one has been updated. This is what prevents us from accidentally committing past an unprocessed sequence number. It does open up the possibility of having more duplicates, but we accept this trade-off, and elsewhere we take measures to mitigate that risk.

I do think that we could do with a test which demonstrates this behaviour, either in our kinsumer fork, or in this project. I'l need to think about this a bit but I'll see what I can come up with.

colmsnowplow commented 2 years ago

Graceful shutdown. I can see you call Stop() on the kinsumer client, but I've been trying to figure out what happens next in the concurrent process which is looping over incoming messages. Ideally, a stream processing app should finish writing messages to the target, then ack all outstanding messages, then exit. I think you might be OK based on reading this code block but I would like to understand this shutdown better.

This is in kinsumer as well - kinsumer waits for the main waitgroup when we call Stop(). This returns the Run() function, and that will trigger some deferred functions to stop all the different processes involved.

The most relevant of these is stopConsumers - which closes the k.stop channel. The effect of this is:

  1. Where we're attempting to capture shards, we immediately stop doing so
  2. For processes currently processing data, we:

The timeout in that last bullet point exists because if another consumer wants to seize ownership of the shard, we're preventing that for a period here. However, in the case where something's gone wrong in checkpointing - or checkpointing is simply taking too long (which is configurable in kinsumer but not yet in stream replicator), we do need to give up ownership.

So basically, kinsumer won't shut down until we've acked all the records, unless we take too long to ack the records in which case it will just shut itself down and we'll get duplicates.

For this one also, I'd love to add some kind of unit or integration test for it, but I'll need some time to figure that out.

colmsnowplow commented 2 years ago

@istreeter I've tried to explain the mechanisms involved in your two main comments - I'm not sure how well I've explained the complexity so please shout if discussing it would help!

istreeter commented 2 years ago

Regarding checkpointing. You explained it well, and your links were helpful. I understand now that kinsumer blocks on the checkpointing function until the previous record has also been checkpointed.

But this means that you must checkpoint every message. If you ever accidentally don't checkpoint a message, then kinsumer will end up permanently blocked. Am I right on this?

It makes me concerned about this retrying logic. If there are some failures and some successes, then you drop the successes and retry calling t.Write with the failures only. This means you never ack the dropped successes. Please tell me if I'm wrong about this, because it looks like a big problem.

More generally, it makes me nervous about the code structure with respect to acking. Currently acking is happening in multiple different places: in t.Write and in ft.WriteOversized and in ft.WriteInvalid and completely separately again for filtered messages. To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

colmsnowplow commented 2 years ago

But this means that you must checkpoint every message. If you ever accidentally don't checkpoint a message, then kinsumer will end up permanently blocked. Am I right on this?

As I understand it, basically yeah - but what happens in this scenario is that the shard owner that is blocked will give up ownership of the shard, and another will assume control from the point at which it last successfully checkpointed. So in that scenario we'd get lots of dupes rather than anything more serious like data loss.

(Edit to add:)

The important logic responsible for that is kinsumer's clientRecordMaxAge setting - if the client hasn't been able to checkpoint with DDB for longer than this period of time other clients will stop recognising it as owning the shard and will eventually claim ownership of it -the new consumer will keep trying to claim ownership until the max age has passed, after which it'll just claim the shard from the last successful checkpoint.

But that's only if we don't ack every message, which I don't think will happen unless we have some error or crash.

It makes me concerned about this retrying logic. If there are some failures and some successes, then you drop the successes and retry calling t.Write with the failures only. This means you never ack the dropped successes. Please tell me if I'm wrong about this, because it looks like a big problem.

We ack in the Write function. For kinesis target, either the whole batch succeeds and we ack, or we fail the whole batch (this isn't great would love to improve it).

For other targets, we always ack successes straight away.

So we do ignore successes at this point, but only after they've been acked.

By the way, currently the kinesis source only returns messages one by one, rather than in batches, so we don't have any possibility of hitting that scenario until we fix batching regardless.

More generally, it makes me nervous about the code structure with respect to acking. Currently acking is happening in multiple different places: in t.Write and in ft.WriteOversized and in ft.WriteInvalid and completely separately again for filtered messages. To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

I take the point, we could likely improve readability of the code. I'd be curious about the latency implications of moving the acks to one place at the very end - perhaps it's minimal. I'm definitely open to better code design here though, so am interested to surface ideas.

However, I would challenge you on this point:

To be 100% sure that you are acking every single message, I would prefer the code if I could see that acking all happens in one place.

Failure targets are just targets with a wrapper on the data to construct a bad row. So we actually only ever have one thing responsible for acking - the Write function of a target. We call it here and here.

Overall, this is a design that pre-existed my involvement in the project, and I haven't had need to expend effort to redesign it to date - I'm definitely open to doing that.

To be 100% sure that you are acking every single message,

I should ask - in light of this explanation, do you think there is a more serious concern that something might be wrong here? (in which case I'm keen to flesh that out sooner), or is this a code readability issue? (in which case I agree, but perhaps it can be addressed in the next release).

istreeter commented 2 years ago

I should ask - in light of this explanation, do you think this is a code readability issue? (in which case I agree, but perhaps it can be addressed in the next release), or is there a more serious concern that something might be wrong here?

Before your explanation, I thought there was a more serious issue, and something was wrong. But you now satisfied me that every target acks the messages immediately after a successul write.

So is there a readability issue..... I don't know. It took me a while to untangle the combination of retrying + acking, because of how it was spread across different files. But now that I understand it, it no longer seems so bad.

So we actually only ever have one thing responsible for acking - the Write function of a target

Well... strictly speaking this line too. But I agree it's not as bad as I made out.

colmsnowplow commented 2 years ago

OK nice, good to hear I've managed to explain it.

Well... strictly speaking this line too. But I agree it's not as bad as I made out.

Ah, yes you're right! I forgot about this - this is basically my fault for somewhat crowbarring in that part of the filtering logic, and in retrospect it does kind of conflict with the overall design. This is for sure something to think about, it's somewhat jarring design-wise now that I reflect on it through a wider lens.

colmsnowplow commented 2 years ago

As far as my own review goes, once the filters are fixed up (ie this PR: https://github.com/snowplow-devops/stream-replicator/pull/185), I think I'm ready to approve.

There are quite a few things that could be better, but they don't necessarily warrant blocking release. The below follow up issues have been created:

Transform config could be better: https://github.com/snowplow-devops/stream-replicator/issues/189 Unit tests could be simpler: https://github.com/snowplow-devops/stream-replicator/issues/190

Filters: Base filter should fail earlier on misconfiguration: https://github.com/snowplow-devops/stream-replicator/issues/191 API should change to negative lookahead: https://github.com/snowplow-devops/stream-replicator/issues/188 Remove some jank: https://github.com/snowplow-devops/stream-replicator/issues/186

Analytics SDK-related:

Two bugs and one potential improvment have been opened on the analytics sdk repo: Bug - https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/37 Bug - https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/36 Gjson - https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/38

When they get addressed, these issues are to track the changes in this project that should follow:

Fixup after bugs fixed: https://github.com/snowplow-devops/stream-replicator/issues/187 API change if we do so in analytics SDK: https://github.com/snowplow-devops/stream-replicator/issues/193

colmsnowplow commented 2 years ago

@istreeter I can't approve my own PR - which in fairness makes sense!

For me, this is ready to rebase and release. Before I squash the latest commits in and force push, just want to check if there was anything unanswered for you, or anything remaining that you'd like to dig into?

istreeter commented 2 years ago

Hi @colmsnowplow I can see you've addressed (or considered) everything I brought everything in my previous review. So I'm happy to approve it just based on that.

If there is any section of code that you would still like my opinion on, then please just point me to where. Otherwise I will skip giving it a full re-review.

colmsnowplow commented 2 years ago

Note for posterity: There are/will be a few force pushes, becuase some of the changes from review are complicated to rebase in.

I'm doing it in stages, checking the diff, against remote and against a local backup, and am pushing at various sensible points before I attempt the trickier parts.