Closed kaxap closed 4 years ago
Grafana metrics:
roughly at 9:00 12 goroutines started batch-inserting data into the database (batch size = 90), throughput = 1200 rows per second then at roughly 9:10 I changed parameters to 24 goroutines. Write throughput remained the same (1200 rows per sec).
As you can see, servers are underutilized with iowait around 4%, CPU ~20%, network ~2%, and more than 30GB free RAM on the most busy server.
Currently I need to digest 25TB of data, and I planned to use CockroachDB for that. But it seems like it doesn't perform well for some reason.
Hi @kaxap, thanks for filing this issue. There are a few things you can try for improving the performance of this bulk import:
large_table
table a number of times (try 100). ALTER TABLE large_table SCATTER
to manually force all of the table's ranges to scatter around your cluster. This will happen automatically even if you don't run the command, but this will ensure that it scatters quickly.RETURNING NOTHING
as it shouldn't be having any effect outside of a transaction.Hi @nvanbenschoten Thank you for the reply.
I want to make a composite primary key on (field1
, timestamp
) and split on that. While field1
is fairly arbitrary, timestamp
is continuously increasing. Would it be a good idea at all?
While field1 is fairly arbitrary, timestamp is continuously increasing. Would it be a good idea at all?
As long as field1
comes first in the order of your composite primary key it will be the dominant sort key, so as long as field1
doesn't have hotspots in your workload, there shouldn't be any problems with that approach. What we want to avoid is a primary key with a skewed distribution (like timestamp
) which would cause all operations to take place in the same part of the keyspace, creating a load imbalance that's difficult to distribute.
@nvanbenschoten Thank you!
I made the following changes:
The cockroachdb instances were reinstalled on 9 servers with new schema. This time with composite key on field1
and timestamp
:
CREATE TABLE "public"."large_table" (
"timestamp" int4 NOT NULL,
"field1" varchar(16) NOT NULL,
"field2" varchar(20) NOT NULL,
"field3" varchar(20) NOT NULL,
"field4" int4,
"field5" int4,
"field6" bool NOT NULL,
"field7" int4,
PRIMARY KEY (field1, timestamp)
);
Then the table was split in the following manner:
ALTER TABLE large_table SPLIT AT VALUES ('00');
ALTER TABLE large_table SPLIT AT VALUES ('01');
ALTER TABLE large_table SPLIT AT VALUES ('02');
...
ALTER TABLE large_table SPLIT AT VALUES ('99');
Since Field1
is 99.9% of the time is numerical.
The ranges were scattered as recommended above:
ALTER TABLE large_table SCATTER;
The batch size was set to 1000, and 24 goroutines simultaneously performing inserts were spawned.
Insert speed was 34-35k rows per second at start then it slowly degraded to 20k rows per second over 2 hours:
Grafana metrics:
It can be seen, that CPU utilization was below 40%, iowait was below 7%, around 30GB RAM was free on the most busy server, network utilization was below 8% (there is 1Gbps link between servers).
Is it some kind of congestion? Then why the CPU, disk and network utilization dropped the moment workers stopped inserting the data?
It looks like the drops in throughput correspond to increases in replica count. Is it possible that individual workers were each inserted into only a single range for a given statement at first but started inserting across multiple ranges for each statement once the range splits occurred? Insert statements can achieve maximum performance if they operate on only a single range because this requires less coordination. If you can partition your worker goroutines such that they are each inserting into a single range, even if this requires slightly smaller batches, you should be able to improve throughput.
You can also try increasing the number of goroutines you're using. A good rule of thumb would be a goroutine per cockroach core. So in the case of 9 32-core machines, I'd try using 288 goroutines. This should work as long as you send your SQL to all nodes. HAProxy should ensure this. You could also try load balancing in your client app itself by opening SQL connections in a round-robin pattern and avoiding the extra overhead of HAProxy.
Out of curisity, how many rows are you planning to insert in total?
Out of curisity, how many rows are you planning to insert in total?
~ 500 billions
@nvanbenschoten Thank you for the insights.
So I rebooted the cluster (removing the data folder) and changed the number of goroutines to 288, as you recommended.
Each goroutine maintaining a map of rows according to the partitioning, e.g. map[string][]*Row
once slice's length is reached 1000 it is inserted to the database.
After a minute, the number of inserts reached astonishing 400 (400k rows per second), then it dropped to 100. In the client logs lots of driver: bad connection
have appeared.
After 10 minutes "waiting on split that failed" errors popped up:
18/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
2018/08/07 21:37:31 pq: waiting on split that failed: failed to send RPC: sending to all 3 replicas failed; last error: {<nil> context deadline exceeded}
First 10 minutes dashboard:
Network utilization first 10 minutes (yellow is HAProxy node)
Seems like HAProxy is waiting on (dead?) CockroachDB nodes, hence the "driver: bad connection" errors.
Dashboard 30 minutes:
Metrics 30 minutes:
In the CockroachDB's logs the following error was spotted:
E180808 01:50:56.474893 299807 server/server.go:1539 [n4,client=192.168.0.18:43700] write tcp 192.168.0.2:26257->192.168.0.18:43700: write: broken pipe
Where 192.168.0.2
is the CockroachDB's node, and 192.168.0.18
is HAProxy.
As well as the following warnings:
W180808 01:50:51.061498 312364 kv/dist_sender.go:1305 [n4] have been waiting 1m0s sending RPC to r77 (currently pending: [(n3,s3):6]) for batch: [txn: 9e4d3888], EndTransaction [/Table/51/1/"44213918777"/1565249113/0,/Min)
W180808 01:50:53.913753 312992 kv/dist_sender.go:1305 [n4] have been waiting 1m0s sending RPC to r77 (currently pending: [(n3,s3):6]) for batch: [txn: 1dde1157], EndTransaction [/Table/51/1/"44887312077"/1565248586/0,/Min)
HAProxy config:
global
maxconn 4096
defaults
mode tcp
# Timeout values should be configured for your specific use.
# See: https://cbonte.github.io/haproxy-dconv/1.8/configuration.html#4-timeout%20connect
timeout connect 10s
timeout client 1m
timeout server 1m
# TCP keep-alive on client side. Server already enables them.
option clitcpka
listen psql
bind :26258
mode tcp
balance roundrobin
option httpchk GET /health?ready=1
server cockroach1 192.168.0.1:26257 check port 14081
server cockroach2 192.168.0.2:26257 check port 14081
server cockroach3 192.168.0.3:26257 check port 14081
server cockroach4 192.168.0.4:26257 check port 14081
server cockroach5 192.168.0.5:26257 check port 14081
server cockroach6 192.168.0.6:26257 check port 14081
server cockroach7 192.168.0.7:26257 check port 14081
server cockroach8 192.168.0.8:26257 check port 14081
server cockroach9 192.168.0.9:26257 check port 14081
Changed timeout client
and timeout server
to 3 minutes for HAProxy and restarted the service. Bad connection
errors are gone. Insert rate is currently at 160k rows per second:
@kaxap the pq: waiting on split that failed
indicate to me that pre-splitting more would help. Next time you try this, let's try splitting 10x more before applying the initial load.
It's hard to tell exactly what's hitting a limit here, but I wouldn't be surprised if it's IO to your disks. I suspect that because we have almost exclusively done performance testing with SSDs and have seen performance penalties for using HDDs in the past. Do you mind providing a screenshot from your "Storage" dashboard? Specifically, I'm interested in the log commit latencies during this test.
Taking a step back here, 500 billion rows is a lot to ingest using standard INSERT statements. I think it would be worth exploring some of CockroachDB's bulk data ingestion mechanistms: https://www.cockroachlabs.com/docs/stable/import-data.html. Specifically, the IMPORT
statement imports data from csv files and is significantly faster than INSERT
statements because it allows the database to generate all sst files using a map-reduce style workflow across the cluster. The pre-generated sst files are then replicated and ingested directly into Cockroach's storage system, allowing it to avoid several layers of overhead.
One of the instances ate all the memory on the server, after restarting, the memory consumption dropped substantially:
Do you mind providing a screenshot from your "Storage" dashboard? Specifically, I'm interested in the log commit latencies during this test.
Taking a step back here, 500 billion rows is a lot to ingest using standard INSERT statements.
Thank you for pointing that out, I will definitely try importing directly from CSV. However, I am currently testing for how much load CockroachDB can take on our specific hardware configuration in order to provide a budget estimation for an upcoming project.
let's try splitting 10x more before applying the initial load
I tried to split to 1000 and 10000 ranges, the nodes were constantly failing with little data been written between DOWN-UP events.
Tried with 100-split again, performance is ok, but is degrading almost to zero over 24h. I understand this is due to more and more ranges being spawned and more server time is spent over coordinating writes?
@nvanbenschoten Moreover, one of the nodes consumes more and more RAM over time, resulting in swap file growth and subsequent sub-optimal performance.
By the way, is it possible to split by field1
and timestamp
separately? Something like this: split field1
by first two chars and split timestamp
by an interval, let's say 86400?
So I tried with 1000x split again.
Batch size = 1000, 288 goroutine writing simultaneously. Insert rate was 0-4 per second.
pq: context deadline exceeded
errors were popping up in the client logs.
In the server logs lots of
E180808 22:56:22.054925 795195 storage/queue.go:778 [raftlog,n1,s1,r720/4:/Table/51/1/"xx{x"-x"}] requested entry at index is unavailable
errors were logged
However, I am currently testing for how much load CockroachDB can take on our specific hardware configuration in order to provide a budget estimation for an upcoming project.
Got it. Thanks for explaining.
The storage dashboard graphs are pretty telling. They show that during peak load WAL sync latencies are reaching upwards of 10s, but surprisingly only on one machine. Is it possible that this is the same machine that experienced the high memory consumption? Regardless, multi-second disk sync latencies generally mean that we're pushing IO a little too hard.
I tried to split to 1000 and 10000 ranges, the nodes were constantly failing with little data been written between DOWN-UP events.
In what way were the nodes failing?
I understand this is due to more and more ranges being spawned and more server time is spent over coordinating writes?
That's not quite correct. There's a fixed amount of extra overhead for writes that need to span multiple ranges. If a write can span only a single range then it can avoid this overhead, but once a write is spanning at least two ranges, there's no incremental overhead to it spanning extra ranges.
Moreover, one of the nodes consumes more and more RAM over time, resulting in swap file growth and subsequent sub-optimal performance.
That's strange and indicative of a load imbalance. Next time you see this, could you navigate to https://<bad_node_hostname>:<adminui_port>/debug/pprof/ui/heap/
and take a screenshot?
By the way, is it possible to split by field1 and timestamp separately? Something like this: split field1 by first two chars and split timestamp by an interval, let's say 86400?
Yes, that is possible. You can run a command like:
ALTER TABLE large_table SPLIT AT VALUES ('00', '2016-01-25');
--or like
ALTER TABLE large_table SPLIT AT VALUES ('00', '2016-01-25'::timestamp + (25 * 86400::interval));
Why don't we drop the number of concurrent writers in half (144) and see if we can stabilize on a certain throughput. Once there, we can begin to ramp it up slowly and see where we hit a limit.
@nvanbenschoten
In what way were the nodes failing?
I guess it was some hardware issue, I reinstalled all 9 nodes with 1000x split again and the cluster is currently stable.
There's a fixed amount of extra overhead for writes that need to span multiple ranges.
I noticed that throughput drops at the time when number of replicas grows:
Is it normal behaviour?
Is it possible that this is the same machine that experienced the high memory consumption?
@nvanbenschoten quite possibly. So if one node experiencing large write latency, whole cluster's performance would suffer?
Why don't we drop the number of concurrent writers in half (144) and see if we can stabilize on a certain throughput. Once there, we can begin to ramp it up slowly and see where we hit a limit.
Reduced the number of goroutines to 144, split data by 100x field1, 100x timestamp (total 10k). Throughput dropped every time number of replicas per node rose:
(sorry, there is no option for 2 day range in dashboard)
So if one node experiencing large write latency, whole cluster's performance would suffer?
No, only writes that are modifying data that is resident to that node should suffer. However, what you may be seeing is that because you're batching so many writes together at once, all batches are writing at least in part to the node with the large write latencies.
This problem is alleviated to some extent because a write only requires a majority of nodes to acknowledge it to commit. This means that temporary blips in write latency on a minority of nodes should not slow down writes. However, if a node has consistently large write latency then we'll slow down all writes to ranges that have replicas on that node to prevent the replica from falling further and futher behind.
Throughput dropped every time number of replicas per node rose:
It's possible that each of the rangle splits is corresponding with a reduction in the number of transactions that are able to commit all on a single range. We can verify this by looking at the graph at:
https://<hostname>:8080/#/debug/chart?metrics=%5B%7B%22downsampler%22%3A1%2C%22aggregator%22%3A2%2C%22derivative%22%3A0%2C%22perNode%22%3Afalse%2C%22source%22%3A%22%22%2C%22metric%22%3A%22cr.node.txn.commits%22%7D%2C%7B%22downsampler%22%3A1%2C%22aggregator%22%3A2%2C%22derivative%22%3A0%2C%22perNode%22%3Afalse%2C%22source%22%3A%22%22%2C%22metric%22%3A%22cr.node.txn.commits1PC%22%7D%5D
We can verify this by looking at the graph
@nvanbenschoten This is screenshot of the chart from the link above:
Which corresponds to this overview dashboard
@nvanbenschoten any ideas?
Thanks for the graphs @kaxap! We can see from them that most requests aren't going to only a single range. That's preventing us from hitting a special fast-path, but I don't think it's actually the cause of the slowdown you're observing. Still, it's a little hard to see because the txn.commits
graph is so much larger than the txn.commits1PC
graph. Let's take a look at:
https://<hostname>:8080/#/debug/chart?metrics=%5B%7B%22downsampler%22%3A1%2C%22aggregator%22%3A2%2C%22derivative%22%3A1%2C%22perNode%22%3Atrue%2C%22source%22%3A%22%22%2C%22metric%22%3A%22cr.node.txn.commits1PC%22%7D%5D
Let's also try reducing the concurrency of your load generator until we get the 99th percentile service latency beneath 10s. While we do that, we can continue to monitor the log commit latency that we looked at before. The only real difference between this test and the kind we do regularly is the use of HDDs, so I'm trying to determine whether that might be responsible for some of the differences here.
Also, could we take a look at the following graph during the same period:
https://<hostname>:8080/#/debug/chart?metrics=%5B%7B%22downsampler%22%3A1%2C%22aggregator%22%3A2%2C%22derivative%22%3A0%2C%22perNode%22%3Atrue%2C%22source%22%3A%22%22%2C%22metric%22%3A%22cr.node.sys.cpu.sys.percent%22%7D%5D
@nvanbenschoten Really appreciate your help.
Still, it's a little hard to see because the txn.commits graph is so much larger than the txn.commits1PC graph. Let's take a look at
It has a single peak at the beginning of batch insert:
With a legend:
Also, could we take a look at the following graph during the same period:
Here it is
Those graphs correspond to the following overview dashboard:
Let's also try reducing the concurrency of your load generator until we get the 99th percentile service latency beneath 10s. While we do that, we can continue to monitor the log commit latency that we looked at before. The only real difference between this test and the kind we do regularly is the use of HDDs, so I'm trying to determine whether that might be responsible for some of the differences here.
I see. Should I reset the roach cluster and start from scratch or current state will do as well?
Should I reset the roach cluster and start from scratch or current state will do as well?
The current state will do fine for this.
@nvanbenschoten went with the number of goroutines all the way down to 20 goroutines simultaneously inserting batches of 1000 before Service Latency: SQL, 99th percentile
went under 10 seconds.
Log Commit Latency: 99th Percentile
is around 500-600 ms (there are 2 nodes with below 10ms latency though).
Insert rate is really low 7-8k rows per second.
Screenshots:
Storage:
Overview:
Complete set of metrics panel screenshots: https://imgur.com/a/P3WLvgD
@nvanbenschoten any possible insights on this?
Thanks for the screenshots @kaxap! Unfortunately, nothing jumps out to me in them.
I'm still trying to figure out why we're seeing a drop in throughput when ranges split. That shouldn't happen. One thought I had was that we could be running into concurrency limits in the component that fans out requests to ranges. We can check this by looking at the following graph over a period where ranges split (the Replicas per Node graph increases) and throughput drops:
https://<hostname>:8080/#/debug/chart?metrics=%5B%7B"downsampler"%3A1%2C"aggregator"%3A2%2C"derivative"%3A2%2C"perNode"%3Afalse%2C"source"%3A""%2C"metric"%3A"cr.node.distsender.batches.async.throttled"%7D%2C%7B"downsampler"%3A1%2C"aggregator"%3A2%2C"derivative"%3A2%2C"perNode"%3Afalse%2C"source"%3A""%2C"metric"%3A"cr.store.intentresolver.async.throttled"%7D%5D
Hi @kaxap, do you have any update about this issue?
Hi @nvanbenschoten
Unfortunately we had to disassemble our test environment since the servers were borrowed from other parts of the project.
In retrospective, it might be the case of commit log located on the same partition with data. With other databases we usually move commit log to a separate disk to increase the throughput (since we are using 7200 SAS disks instead of SSDs). Is there any way to do the same for CockroachDB?
The setup is as following: 15 servers, 32 cores, 128 GB RAM, 10 TB SAS HDD each.
5/15 have other low CPU/RAM/IO tasks running.
Cockroach db have 1 table
I am using Golang lib/pq driver. The CockroachDB nodes are behind HAProxy on one of the machines.
Inserting batches in the following manner
Tried different batch sizes from 10 to 500. Tried different number of goroutines(from 2 to 48) performing inserts simultaneously .
Maximum insert rate was ~12k rows per second (roughly ~0.6 MB/s). With batch size = 360, number of goroutines P50 latency = 10737 ms P99 latency = 17179 ms
Information on inserting data nature: {
field1
+field2
} set cardinality = 30 millionsfield3
is an item from field2 set, thus {field1
+field3
} = 30 millions {field4
+ field5} set cardinality = 270 thousands field7 is Poisson distributed with lamda = 4 timestamp is Unix timestampWith batzhsize=360 and number of goroutines the performance is roughly the same ~1400 rows per second/0.1 MBps.
Dashboard:
What can be done to increase the performance on current servers?