cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.86k stars 3.77k forks source link

roachtest: cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100k/protocol=rangefeed/format=json/sink=null failed #113489

Closed cockroach-teamcity closed 10 months ago

cockroach-teamcity commented 10 months ago

roachtest.cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100k/protocol=rangefeed/format=json/sink=null failed with artifacts on release-23.2 @ 348d11425a1184ce4de8a6f8dc85995cd3c653bc:

(monitor.go:153).Wait: monitor failure: unexpected node event: n6: cockroach process for system interface died (exit code 15)
test artifacts and logs in: /artifacts/cdc/scan/catchup/nodes=5/cpu=16/rows=1G/ranges=100k/protocol=rangefeed/format=json/sink=null/run_1

Parameters: ROACHTEST_arch=amd64 , ROACHTEST_cloud=gce , ROACHTEST_cpu=16 , ROACHTEST_encrypted=false , ROACHTEST_ssd=0

Help

See: [roachtest README](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/README.md) See: [How To Investigate \(internal\)](https://cockroachlabs.atlassian.net/l/c/SSSBr8c7) See: [Grafana](https://go.crdb.dev/roachtest-grafana/teamcity-12428970/cdc-scan-catchup-nodes-5-cpu-16-rows-1g-ranges-100k-protocol-ra/1698752648932/1698757548506)

/cc @cockroachdb/cdc

This test on roachdash | Improve this report!

Jira issue: CRDB-33017

Epic CRDB-26372

miretskiy commented 10 months ago

The node oomed:

Screenshot 2023-11-02 at 5 14 12 PM

@erikgrinaker As part of https://github.com/cockroachdb/cockroach/pull/109346, we changed client side semaphore from fixed one (8) to rate based (100/s). More importantly, we have changed the condition when we release catchup scan allocation from

// If we see the first non-empty checkpoint, we know we're done with the catchup scan.
if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil {
  active.catchupRes.Release()
  active.catchupRes = nil
}

to if active.catchupRes != nil {... That is, we release alloc as soon as we see any checkpoint, not just non-empty checkpoint. Do you recall why this was done? I'm under impression that the empty checkpoint is emitted before all of the catchup events are emitted -- it's the first event that the range will receive. As a result, if the consumer is slower than the rate of incoming events, we could wind up in a situation where we keep starting catchup scans -- even before previous ones have completed.

I think the goal was to manage the goroutine creation rate, and we didn't think about the indirect impact the fixed sized semaphore had on the (untracked) memory usage in the http2/grpc land.

I wonder if we should add the "non-empty" checkpoint bit. Thoughts?

erikgrinaker commented 10 months ago

The empty checkpoint is emitted after the catchup scan. Its purpose is exactly to signal to the client that the catchup scan is completed.

miretskiy commented 10 months ago

Hmm... Then it's rather strange in this case, and I'm confused how we could have used up so much memory in http2 buffers. We clearly allocated more than 2MB * 100 worth of memory... And because we're doing catchup scan -- we're not running workload (i.e. we are running w/ end_time); there shouldn't be any events other than checkpoints coming in after the catchup scan completes.

miretskiy commented 10 months ago

We definitely started lots of catchup ranges:

Screenshot 2023-11-02 at 6 20 19 PM

but it's very strange to see them on node 6... which has no ranges.

miretskiy commented 10 months ago

Okay, so there were few strange things going on: first, the captured metrics seem to indicate that 5 nodes (all but the 6th one) were dead -- which sort of makes sense since after the restart, all of the ranges were assigned to node 6. I don't know if there was some partitioning, or whatnot, but, the changefeed was restarted:

W231031 12:52:59.162781 5334 ccl/changefeedccl/changefeed_stmt.go:1310 ⋮ [T1,Vsystem,n6,job=‹CHANGEFEED id=913195922462900230›] 127  Changefeed job 913195922462900230 encountered transient error: ‹inbox communication error›: grpc: ‹context canceled› [code 1/Canceled] (attempt 1)

And after that, all ranges were assigned to node 6 -- why? not clear:

Screenshot 2023-11-03 at 4 20 43 PM

FInally, the OOM -- the change to acquire catchup scan quota based on a rate -- I think it works exactly as it should. The catchup scan for each range completes -- and completes quickly. So we're able to keep starting more and more rangefeeds; the consumer -- even though it's writing to null sink cannot keep up with a large fan-in factor; but the data produced by catchup scans fits in the network buffers (2MB) -- and so, eventually the node ooms.

We could consider lowering 100/sec down to something smaller -- but the reality is that there was always a possiblity of OOMs -- catchup scan or not, if enough ranges generate enough data, and the consumer cannot keep up, regular rangefeeds can cause OOM. That's why there is mux rangefeed.

I suspect it would be prudent to temp down this setting when using regular rangefeed.

erikgrinaker commented 10 months ago

I think this was a cascading failure.

The first node that started to struggle was n3. At 12:53:00 it saw some kind of transient stall, which caused high Raft commit latencies and a spike in runnable goroutines.

Screenshot 2023-11-07 at 13 12 58 Screenshot 2023-11-07 at 13 03 31

We also see signs of this in the logs:

I231031 12:52:59.018028 501 kv/kvserver/replica_raft.go:1571 ⋮ [T1,Vsystem,n3,s3,r42/4:‹/Table/{39-40}›,raft] 100701  slow non-blocking raft commit: commit-wait 993.925532ms sem 812ns
I231031 12:53:00.022247 446 kv/kvserver/store_raft.go:699 ⋮ [T1,Vsystem,n3,s3,r4/2:‹/System{/tsd-tse}›,raft] 100709  raft ready handling: 0.99s [append=0.00s, apply=0.99s, , other=0.00s], wrote [apply=913 KiB (4 in 3 batches)]; node might be overloaded
W231031 12:53:00.064912 1974 2@rpc/clock_offset.go:291 ⋮ [T1,Vsystem,n3,rnode=2,raddr=‹10.142.0.177:26257›,class=system,rpc] 911  latency jump (prev avg 0.41ms, current 2039.55ms)

This in turn led to it disconnecting from all RPC peers, likely because it wasn't responding to gRPC pings:

E231031 12:52:59.032693 39 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=?,raddr=‹34.138.110.78:26257›,class=system,rpc] 100703  disconnected (was healthy for 1h8m15.801s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 12:53:00.065741 285 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=1,raddr=‹10.142.0.214:26257›,class=default,rpc] 100755  disconnected (was healthy for 1h8m16.814s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 12:53:00.067112 1871 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=2,raddr=‹10.142.0.177:26257›,class=default,rpc] 100763  disconnected (was healthy for 1h8m14.548s): grpc: ‹error reading from server: EOF› [code 14/Unavailable]
E231031 12:53:00.068378 1855 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=4,raddr=‹10.142.0.104:26257›,class=default,rpc] 100765  disconnected (was healthy for 1h8m14.577s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 12:53:00.069328 537 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=1,raddr=‹10.142.0.214:26257›,class=system,rpc] 100768  disconnected (was healthy for 1h8m16.835s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 12:53:00.069540 2116 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=5,raddr=‹10.142.0.113:26257›,class=system,rpc] 100770  disconnected (was healthy for 1h8m13.402s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 12:53:00.072918 6597153 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 100772  disconnected (was healthy for 18m14.231s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]

When this happens, n6's changefeed restarts:

W231031 12:52:59.162781 5334 ccl/changefeedccl/changefeed_stmt.go:1310 ⋮ [T1,Vsystem,n6,job=‹CHANGEFEED id=913195922462900230›] 127  Changefeed job 913195922462900230 encountered transient error: ‹inbox communication error›: grpc: ‹context canceled› [code 1/Canceled] (attempt 1)

Note that at this point, n6 is still connected to all other RPC nodes except n3. However, it still schedules all changefeed processors locally:

Screenshot 2023-11-07 at 13 21 36

At this point, n6 is slammed by traffic (probably checkpoints?).

Screenshot 2023-11-07 at 13 22 18

20 seconds later, n6 is so overwhelmed that it loses RPC connections with all other peers since it can't respond to gRPC heartbeats.

E231031 13:00:19.555282 6712057 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n1,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 103881  disconnected (was healthy for 29m1.404s): grpc: ‹error reading from server: read tcp 10.142.0.214:57394->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.556763 6711980 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n1,rnode=6,raddr=‹10.142.1.180:26257›,class=system,rpc] 103883  disconnected (was healthy for 29m1.432s): grpc: ‹error reading from server: read tcp 10.142.0.214:57378->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.554896 6406128 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n2,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 100127  disconnected (was healthy for 28m1.354s): grpc: ‹error reading from server: read tcp 10.142.0.177:51436->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.555154 6234295 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n2,rnode=6,raddr=‹10.142.1.180:26257›,class=system,rpc] 100128  disconnected (was healthy for 29m1.419s): grpc: ‹error reading from server: read tcp 10.142.0.177:40926->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 12:53:00.072918 6597153 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 100772  disconnected (was healthy for 18m14.231s): grpc: ‹connection error: desc = "transport: error while dialing: connection interrupted (did the remote node shut down or are there networking issues?)"› [code 14/Unavailable]
E231031 13:00:19.554872 6597153 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 100925  disconnected (was healthy for 7m19.481s): grpc: ‹error reading from server: read tcp 10.142.1.181:58698->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.556042 6277791 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n3,rnode=6,raddr=‹10.142.1.180:26257›,class=system,rpc] 100926  disconnected (was healthy for 29m1.425s): grpc: ‹error reading from server: read tcp 10.142.1.181:44466->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.554924 7063443 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n4,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 99954  disconnected (was healthy for 25m33.715s): grpc: ‹error reading from server: read tcp 10.142.0.104:35132->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.555191 6745845 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n4,rnode=6,raddr=‹10.142.1.180:26257›,class=system,rpc] 99955  disconnected (was healthy for 29m1.419s): grpc: ‹error reading from server: read tcp 10.142.0.104:38034->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.554976 7029439 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n5,rnode=6,raddr=‹10.142.1.180:26257›,class=default,rpc] 101430  disconnected (was healthy for 25m33.713s): grpc: ‹error reading from server: read tcp 10.142.0.113:60984->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
E231031 13:00:19.555238 6705254 2@rpc/peer.go:580 ⋮ [T1,Vsystem,n5,rnode=6,raddr=‹10.142.1.180:26257›,class=system,rpc] 101431  disconnected (was healthy for 29m1.325s): grpc: ‹error reading from server: read tcp 10.142.0.113:50134->10.142.1.180:26257: read: connection reset by peer› [code 14/Unavailable]
  1. Why did losing contact with a single node restart the entire changefeed? Is this because DistSQL has to restart the entire flow if it loses contact with a single node?

  2. When we restart the flow, why do we schedule it on the local node? n6 was still connected to all other nodes except n3 at this time, and could have started the flows there.

  3. Can we reproduce this simply by disconnecting (e.g. pausing) a single node while a changefeed is running?

miretskiy commented 10 months ago

Why did losing contact with a single node restart the entire changefeed? Is this because DistSQL has to restart the entire flow if it loses contact with a single node? Yes

When we restart the flow, why do we schedule it on the local node? n6 was still connected to all other nodes except n3 at this time, and could have started the flows there. No, that shouldn't happen -- but we use distsql to plan the flow and if that information is out of whack on n6 -- we can get into this mode (we have 0 observability here -- something that should be fixed).

  1. perhaps we can.
miretskiy commented 10 months ago

We are going to reintroduce catchup semaphore (in addition to rate limiter) for regular rangefeeds. This will need to be backported; The PR will be sent out soon.

miretskiy commented 10 months ago

I just confirmed that we got pretty bad plan when changefeed restarted:

{
  "sql": "job:913195922462900230",
  "nodeNames": [
    "6"
  ],
  "processors": [
    {
      "nodeIdx": 0,
      "inputs": [],
      "core": {
        "title": "ChangeAggregator/0",
        "details": [
          "/Table/106/{1-2}"
        ]
      },
      "outputs": [],
      "stage": 1,
      "processorID": 0
    },
    {
      "nodeIdx": 0,
      "inputs": [],
      "core": {
        "title": "ChangeFrontier/1",
        "details": [
          "Out: @2,@3,@4"
        ]
      },
      "outputs": [],
      "stage": 2,
      "processorID": 1
    },
    {
      "nodeIdx": 0,
      "inputs": [],
      "core": {
        "title": "Response",
        "details": []
      },
      "outputs": [],
      "stage": 0,
      "processorID": -1
    }
  ],
  "edges": [
    {
      "sourceProc": 0,
      "sourceOutput": 0,
      "destProc": 1,
      "destInput": 0,
      "streamID": 0
    },
    {
      "sourceProc": 1,
      "sourceOutput": 0,
      "destProc": 2,
      "destInput": 0,
      "streamID": 0
    }
  ],
  "flow_id": "36459508-8538-40de-997f-5e600240b460",
  "flags": {
    "ShowInputTypes": false,
    "MakeDeterministic": false
  }
}