spotify / heroic

The Heroic Time Series Database
https://spotify.github.io/heroic/
Apache License 2.0
848 stars 109 forks source link

Heroic Bigtable Consumer does not handle failures as expected #724

Closed samfadrigalan closed 3 years ago

samfadrigalan commented 3 years ago

DoD

Heroic should properly address exception handling.

Background

Heroic bigtable consumer failed to write to bigtable when the new column family was not created yet. There was no exception logged and the consumer ack-ed the message as if the write was successful. I got the exception below by hacky debugger evaluations.

io.grpc.StatusRuntimeException: NOT_FOUND: Error while mutating the row '\023\023distribution-test-1\004\003\003env\007\007staging\004\004host\017\017samanthanoellef\013\013metric_type\014\014distribution\004\004what\005\005stuff\000\000\001u\000\000\000\000' (projects/xpn-heroic-1/instances/metrics-staging-guc/tables/metrics) : Requested column family not found.
    at io.grpc.Status.asRuntimeException(Status.java:517)
    at com.google.cloud.bigtable.grpc.async.BulkMutation.toException(BulkMutation.java:77)
    at com.google.cloud.bigtable.grpc.async.BulkMutation.access$400(BulkMutation.java:59)
    at com.google.cloud.bigtable.grpc.async.BulkMutation$Batch.handleEntries(BulkMutation.java:227)
    at com.google.cloud.bigtable.grpc.async.BulkMutation$Batch.handleResult(BulkMutation.java:200)
    at com.google.cloud.bigtable.grpc.async.BulkMutation$Batch$1.onSuccess(BulkMutation.java:170)
    at com.google.cloud.bigtable.grpc.async.BulkMutation$Batch$1.onSuccess(BulkMutation.java:167)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1021)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1137)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:957)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:726)
    at com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation$GrpcFuture.set(AbstractRetryingOperation.java:90)
    at com.google.cloud.bigtable.grpc.async.RetryingMutateRowsOperation.onOK(RetryingMutateRowsOperation.java:91)
    at com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation.onClose(AbstractRetryingOperation.java:167)
    at com.google.cloud.bigtable.grpc.async.ThrottlingClientInterceptor$1$1.onClose(ThrottlingClientInterceptor.java:125)
    at com.google.cloud.bigtable.grpc.io.ChannelPool$InstrumentedChannel$2.onClose(ChannelPool.java:209)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor$UnAuthResponseListener.onClose(RefreshingOAuth2CredentialsInterceptor.java:81)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:678)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
ao2017 commented 3 years ago

Just to avoid any confusion, this problem is not related to distribution implementation :).
About these suggestions :

  1. Create the new column family in prod manually. Running init on the table that is actively receiving read and write request could cause some locking issue.
  2. create a task to address exception handling in heroic. You should have been able to see write failure in the log because the column family didn't exist.
  3. Ensure that init is indeed broken. Make sure configure is set to false during startup.
samfadrigalan commented 3 years ago
  1. Ensure that init is indeed broken. Make sure configure is set to false during startup.

Good call @ao2017 It turns out the column family creation works but we just have to pass the proper config variable and set configure to true. I created a PR to the internal config repo.

I will repurpose this ticket to address the exception handling.

sming commented 3 years ago

@samfadrigalan - some evidence lending import to this issue:

hexedpackets commented 3 years ago

I have concerns about moving to late acks. Right now unprocessable messages get dropped - if we switch to late-ack, those go back to PubSub and will be continuously retried. A relatively small amount of unprocessable messages would plausibly lockup the system. If we know that the messages are bad we could ack them despite not successfully processing them, but if we do that we now have a system where only known failures are safely handled and unknown failures cause queue buildup. I think in general we let messages blow up when we don't know what to do with them and only retry if we're very certain its safe to do so.

samfadrigalan commented 3 years ago

For that concern, can we use dead-letter queues (https://cloud.google.com/pubsub/docs/dead-letter-topics)? This way we will have visibility on the failures and not have any data loss without clogging up the main queues. Because we have copies of the failed messages, we could easily reproduce the failures in a non-prod pipeline. We could then redirect the messages back to the main consumer processing once we've released a fix.

hexedpackets commented 3 years ago

Yeah that sounds like a good plan, the visibility would be great. I wonder how useful redirecting will be in practice though - if we need to make a code change and redeploy, the queue could become way too big to process.

hexedpackets commented 3 years ago

So it turns out PubSub dead letters only after a minimum of 5 delivery attempts and can't be configured for less than that. If there's an issue like the column family missing that spawned this, the cluster ends up doing 5x as much work before rejecting the message.

I'm working on improving the logging and tracing so there is better visibility for write failures, but I'm planning to leave the PubSub acking as-is.

hexedpackets commented 3 years ago

I'm having trouble reproing the lack of logs. I tried running IT tests without the column family created, and the exception was very visibly logged:

SEVERE: Could not complete RPC. Failure #0, got: Status{code=NOT_FOUND, description=table "projects/fake/instances/fake/tables/heroic_it_ff0cb89d-c12b-4f43-9a48-7aad4b9d778c" not found, cause=null} on channel 65.
Trailers: Metadata(content-type=application/grpc,bigtable-channel-id=65)

java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: INTERNAL: unknown family "points"
/* full exception omitted for brevity */
samfadrigalan commented 3 years ago

I'm having trouble reproing the lack of logs. I tried running IT tests without the column family created, and the exception was very visibly logged:

SEVERE: Could not complete RPC. Failure #0, got: Status{code=NOT_FOUND, description=table "projects/fake/instances/fake/tables/heroic_it_ff0cb89d-c12b-4f43-9a48-7aad4b9d778c" not found, cause=null} on channel 65.
Trailers: Metadata(content-type=application/grpc,bigtable-channel-id=65)

java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: INTERNAL: unknown family "points"
/* full exception omitted for brevity */

Were you able to reproduce that log in staging or locally? I wonder if there is a difference between the setup in the IT tests and running the consumers as is in prod or locally (i.e. Are the grpc calls in the production consumer flow wrapped in async functions while the IT calls aren't?). I noticed that log does not exactly look like the stack trace on the description. I had to set up the consumers locally and evaluate expressions in debug mode to get that stack trace as there were no cloud error logs or in the local set up when the consumer tried to process messages.

hexedpackets commented 3 years ago

It should be the same flow - the writes are in a flush call that happens on a timer, its not tied to the actual request. I'll try running the consumers locally with the BT emulator and see what happens.