snowflakedb / snowflake-ingest-java

Java SDK for the Snowflake Ingest Service -
http://www.snowflake.net
Apache License 2.0
72 stars 57 forks source link

Channel Invalidation Issues #547

Closed joshtree41 closed 1 year ago

joshtree41 commented 1 year ago

Hi Team, I'm trying to make sense of some channel invalidation errors in some pipelines that use the snowflake ingest SDK.

Here's are some specs associated with my team's pipeline:

  1. Using java 17, openjdk 17.0.7.
  2. Using version v2.0.1 of the SDK.
  3. Using apache beam v 2.48.0 as the vehicle for the pipeline - though I don't think that this is relevant.

We have some long running jobs that run smoothly for a while but eventually high channel invalidation errors.

The sequence of events typically goes like this:

  1. We typically see some warning or errors with this messaging message=invalidate with matched sequencer or Background flush task failed, client=snowBurst:03740cf8-7673-4f07-9881-b67f4e8abd87, exception=net.snowflake.ingest.utils.SFException: Register blob request failed: Read timed out., detail=Register blob request failed: Read timed out., trace=.
  2. The code picks up an invalid channel, so we attempt to close and commit with something like ingestChannel.close().get().
  3. This throws a SFException error - because the channel is invalidated.
  4. We re-open a channel and continue streaming data, but we lose some of the data that wasn't committed prior to the channel invalidation.

Before I go further - I realize that is what the offset token and SnowflakeStreamingIngestChannel::getLatestCommittedOffsetToken method is for, but without going into too much detail ... in our case it's a bit tricky for us to re-open our stream at the offset token prior to the failure. We can do this from outside the pipeline, but it's much harder to do from within in. As a result, I'd like to better understand how to mitigate these issues to reduce the frequency at which we have to rewind and re-launch.

From our perspective, it's a bit confusing - it feels as though nothing has gone wrong: no errors on our end in processing or serialization, we have plenty of resources on our workers, etc. It looks as though something in the flush service is breaking unexpectedly, but we can't really tell why.

It seems as though a flush method on the channel/client would help in these cases. Which is mentioned in this issue as well: https://github.com/snowflakedb/snowflake-ingest-java/issues/542.

Happy to provide more details or logs if needed!

sfc-gh-lsembera commented 1 year ago

Hi @joshtree41, there are multiple reasons why a channel could be invalidated, are you always seeing the same error Register blob request failed: Read timed out., detail=Register blob request failed: Read timed out. or are the error messages different?

This particular error occurs when the SDK fails to register the data file in Snowflake and it ran out of retries. Is there a way for you to measure network stability on the node? Are you ingesting from the same region as your Snowflake region?

joshtree41 commented 1 year ago

Hu @sfc-gh-lsembera, thanks for following up! Here's some extra details:


here are multiple reasons why a channel could be invalidated, ...

So there are a couple of logged warning/errors that happen prior to the invalidation, the invalidation error itself always looks like:

Channel <channel> is invalid and might contain uncommitted rows, please consider reopening the channel to restart.

In most cases these error logs seems to kick things off:

Background flush task failed, client=<client>, exception=net.snowflake.ingest.utils.SFException: Register blob request failed: Read timed out., detail=Register blob request failed: Read timed out.,

or

Background flush task failed, client=<client>, exception=net.snowflake.ingest.utils.SFException: Register blob request failed: Connection reset., detail=Register blob request failed: Connection reset., trace=

So in both of the above cases there seems to be some read/connection issues.

Which is followed by some warning logs like this:

Channel has been invalidated because of failure response, name=<channel>, channel_sequencer=0, status_code=21, message=The client provided an out-of-order message, please reopen the channel, executionCount=0
Channel is invalidated, name=<channel>, channel sequencer=0, row sequencer=41268, message=invalidate with matched sequencer
[SF_INGEST] Channel has been invalidated because of failure response, name=<channel>, channel_sequencer=0, status_code=22, message=Another channel managed by this Client had an issue which is preventing the current channel from ingesting data. Please re-open the channel,

There have been some cases where the Read Timeout error log doesn't get triggered and the channel is still invalidated. Much less frequent there though.

The warning logs are a little confusing, but it sounds like the real error is the read/connection, so those may not matter.


Is there a way for you to measure network stability on the node? Are you ingesting from the same region as your Snowflake region?

Our machines for beam are running on GCP in us-central1 and our Snowflake account is hosted in us-west-2 (AWS). I'm not an expert at cross cloud provider region compatibility, those sound "close" but this could be contributing to an issue.

As far as network stability goes, I'll have to do a bit more digging. Things look pretty solid from my end, but I might have to check out the subnet metrics. Ill follow up in a bit with more details if I find any issues there.

joshtree41 commented 1 year ago

@sfc-gh-lsembera - Update: we were able to check out the NAT monitoring on the network we're using and things look pretty clean, no issues reported or anything to suggest network degradation.

Looks like there error log is firing from here.

Can I increase the timeout using this method by any chance?

sfc-gh-tzhang commented 1 year ago

@joshtree41 What's your account name and could you share the a time range when the issue was happening?

sfc-gh-lsembera commented 1 year ago

I see that one of your log line contains status_code=21. We recently merged PR https://github.com/snowflakedb/snowflake-ingest-java/pull/543, which will be released in the next SDK version, and which should improve the situation. I would like to see the logs around this log line if it is the same issue or something different. Would you be able to share that?

joshtree41 commented 1 year ago

Hey team, thanks for continuing to follow up!

@sfc-gh-tzhang,

@sfc-gh-lsembera

Ah nice, those changes look sweet. I'm seeing logs like this right before and after in a bit of a cascade:

Channel is invalidated, name=<channel>, channel sequencer=0, row sequencer=2168, message=invalidate with matched sequencer
[SF_INGEST] Channel has been invalidated because of failure response, name=<channel>, channel_sequencer=0, status_code=22,  message=Another channel managed by this Client had an issue which is preventing the current channel from ingesting data. Please re-open the channel, executionCount=0

But not much else besides this and the connection/read timeout error.

Should I be seeing more logs here? I can look into our logger settings to see if we're silencing anything.

sfc-gh-lsembera commented 1 year ago

@joshtree41 Aren't there any stack traces, which would mention what exceptions were thrown?

joshtree41 commented 1 year ago

Oops, those I sent above are from warning logs, so no stack trace. The only error with a stack trace is the error log we get from the Register blob request failed, something like this:

[SF_INGEST] Background flush task failed, client=<client>, exception=net.snowflake.ingest.utils.SFException: Register blob request failed: Read timed out., detail=Register blob request failed: Read timed out., trace= net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal.registerBlobs(SnowflakeStreamingIngestClientInternal.java:462) net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal.registerBlobs(SnowflakeStreamingIngestClientInternal.java:417) net.snowflake.ingest.streaming.internal.RegisterService.registerBlobs(RegisterService.java:199) net.snowflake.ingest.streaming.internal.FlushService.lambda$registerFuture$2(FlushService.java:244) java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833).

After this error fires, we'll see an bunch of additional warning logs like the ones I linked above.

We do get another error when we try to close the channel after we pick up that it's invalidated, but that one doesn't have any juicy details.

sfc-gh-tzhang commented 1 year ago

@sfc-gh-lsembera Do you think https://github.com/snowflakedb/snowflake-ingest-java/pull/543 would be enough? Looks like the first error is a Connection Reset error which is a SocketException instead of SSLException, or you mean evict idle connections would help here.

@joshtree41 Thanks for the account name! The invalid sequencer error is basically caused by these SocketExceptions, we will see if we can improve the retry logic on our side to handle these. But in general, your client logic needs to handle these kind of transient errors, basically what you want to do with an invalidate channel is to reopen the channel and restart from the latest committed offset token, please see https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens for more detail if you haven't done so.

joshtree41 commented 1 year ago

@sfc-gh-tzhang That makes sense, thank you! Yeah, I'm pretty familiar with the offset token docs. We're working on a solution to rewind based on the last offset prior to the invalidation.

For our current pipeline it's a bit of a pain for us to rewind/reset though. Not all streaming sources will be as easy as Kafka for this. Because it's hard I'm trying to set this up in a way that will result in the least amount of client invalidation as possible. It does seem like some sort of flush all uncommitted data method would help a lot in these cases.

Overall we're really digging the ingest SDK though, it's been a lot smoother for us and the is pretty insanely low. Appreciate all of the work you guys are doing over there!

sfc-gh-lsembera commented 1 year ago

We discussed this with @sfc-gh-tjones last week and we need to have more resilient retries in the SDK. @sfc-gh-wfateem had some concerns about more aggressive retries, but the situation in the SDK is different than in the JDBC driver.

In the driver, users have control over retries, they can easily wrap statement executions with their own retry logic and dynamically react to any exception that is thrown. This is not true for the SDK where blob registration requests run asynchronously in a background thread and when the execution fails, the channel is invalidated and users cannot do anything, except triggering replay process since the last committed offset, which can be pain, just like in the case that @joshtree41 reported. I will open a new PR, where we can discuss the change in more detail.

joshtree41 commented 1 year ago

Alright cool, I agree with that @sfc-gh-lsembera. Thanks for putting this together! Drop me a line if you need anything else to help with the PR.

I was thinking about this over the weekend actually and had a use case that I thought captured the issue pretty well, I think I may as well include it here for posterity:

When processing an unbounded data-set, we'll often window data and buffer data from each window into a grouping. This grouping is processed in our stream processing framework (probably something like apache Beam/Flink/Spark streaming) to fit the specification of the snowflake ingest SDK, then inserted. Often times the invalidation results in a failure associated with a window that was previously processed and had all successful inserts according to the InsertValidationResponse - but in fact these records didn't make it to Snowflake. This is where the main problem is IMO.

There are certainly ways to combat this, but it really can be a pain depending on your stream processing framework and unbounded source data.

Not sure if this explanation is redundant at this point - hopefully it helps out in the grand scheme of things.

sfc-gh-lsembera commented 1 year ago

@joshtree41 Hi, we have just released SDK 2.0.2, which contains improvements how the SDK retries network failures, so please try it out. There are more reasons why channels could be invalidated with status code 21 - server-side fixes fill be deployed within the next few weeks.

I am leaving this issue open and will close it once the fixes are deployed.

joshtree41 commented 1 year ago

@sfc-gh-lsembera awesome, thanks for letting me know. I'll try and re-deploy with the new version sometime today.

I really appreciate the continued work on this issue!

joshtree41 commented 1 year ago

We redeployed our production workload using the 2.0.2, I'll keep this issue updated with our findings.

joshtree41 commented 1 year ago

Hey guys, wanted to give you a quick updated. Version 2.0.2 is working nicely, but still seeing some sporadic channel closes.

I also see some updated verbiage on the SF Exception, nice:

net.snowflake.ingest.utils.SFException: One or more channels [<table.channel>] might contain uncommitted rows due to server side errors, please consider reopening the channels to replay the data loading by using the latest persistent offset token.

FWIW - the only invalidation/close issues we've seen have the above verbiage. Not sure if this useful but thought it was worth metnioning.

Pumped the the service side changes!

joshtree41 commented 1 year ago

One more update:

We're actually seeing these invalidations exclusively on ingestChannel.close().get(); operations exclusively, and I don't think there is any actual lost data in the channel either.

QQ: if we are doing buffered inserts over small groups of rows, is it possible that part of the bulk insert is committed into the table and another part is not?

sfc-gh-lsembera commented 1 year ago

Hi @joshtree41, thanks for the update! What errors do you get when calling ingestChannel.close().get();? I suppose it is not status_code=21 anymore because those are caused by out of order chunks on the server-side, which shouldn't be caused by closing channels.

Batches passed to insertRows are either fully committed or not committed at all.

joshtree41 commented 1 year ago

Hi @sfc-gh-lsembera - yeah I'm not seeing status_code=21 anymore. Looks like we're seeing status_code=20.

Warning looks like this right before the SF error:

[SF_INGEST] Channel has failure status_code, name=DAVE.INCOME_DETECTION_SHADOW.PAYROLL_GROUP_ASSESSMENT.SNOWBURST, channel_sequencer=151, status_code=20

For some additional context, this happens when workers in apache beam are decommissioned. We close down the client/channels before the worker goes down and sometimes we see invalidations here.

Batches passed to insertRows are either fully committed or not committed at all.

Okay great, I had a feeling but I wanted to check. In that case, we've not had any data loss after these invalidations.

sfc-gh-lsembera commented 1 year ago

@joshtree41 The enum you linked contains client-side errors, but these status codes come from the server side. The correct enum is here. status_code=20 means that that somebody else reopened the channel and thus took its ownership. Is it possible that while Apache Beam workers are being decommissioned, somebody else starts ingesting again via the same Snowpipe Streaming channel(s)?

joshtree41 commented 1 year ago

@sfc-gh-lsembera Ohhh so that's what the client sequencer is used for, thanks for sending that.

This makes sense to me now I think though. So, theoretically I don't think it should happen in this context because we have a teardown method on each instance that should close the client and channel.

But that teardown method is executed with best effort. So I have a feeling what is happening is another instance is coming up while the previous teardown is still executing - which causes the error.

It's probably because there is some wait time to pass before the CompletableFuture<Void> is returned after the .get() portion of the teardown.

This doesn't seem to result in any data loss though, which is good because we don't have to rewind any of the Pub/Sub resources. I think there is just a timing issue between one instance going up and another going down.

sfc-gh-lsembera commented 1 year ago

@joshtree41 Yeah, this would explain these status_code=20 errors. Are we good to close this issue?

joshtree41 commented 1 year ago

@sfc-gh-lsembera Yeah I think we are good to go. One last question: have the server side updates you referenced here pushed on the Snowflake side yet?

sfc-gh-lsembera commented 1 year ago

@joshtree41 Yes, these changes have already been deployed. Channel invalidations due to status code 20 should now be much more rare (they can still occasionally occur, though).

joshtree41 commented 1 year ago

@sfc-gh-lsembera okay sweet, alright I think we are all set then. Thanks so much!

joshtree41 commented 1 year ago

Hey @sfc-gh-lsembera - one question for you guys, no need to re-open this issue - I can also open a new issue if you guys want me to start fresh with this.

Q: Is there a way that we can access the StreamingIngestResponseCode from the SFException?

We'll see these in the logs, but I'm not 100% sure how get them programmatically.

sfc-gh-lsembera commented 1 year ago

Hi @joshtree41, they are currently not accessible from SFException. This is a good feedback, I can see that it can be helpful in some cases. Please open a new issue and let's continue the discussion there.