Closed davepacheco closed 3 months ago
Hmmm... That's odd. I specifically added a step to make sure the replicas are synced before running the SELECT ...
query.
I haven't seen this error before, and I wasn't able to replicate it. Perhaps we can leave this issue open to see if this error appears again?
@karencfv is there any instrumentation we could add so that next time we hit this, we'll be closer to root cause? (e.g., are there questions you wish you had the answer to that we could answer next time with more logging)
I guess we could log the results from SELECT * ...
if the assertion fails to see if the information did not synch properly or if there is some other random information in there.
Thanks for the report @rcgoodfellow. It looks we're inserting data on one replica, and then syncing on the other via SYSTEM SYNC REPLICA
. I think this might be important, but I'm not sure yet.
It's not entirely clear from the documentation, but here's what it says about SYSTEM SYNC REPLICA
:
Wait until a ReplicatedMergeTree table will be synced with other replicas in a cluster
This appears to ask the replica to process entries in the shared ZooKeeper log, and run the replication commands through its own replication queue. Ok, that seems reasonable, though the details will clearly matter a lot. But this does seem to reply on the ZooKeeper log having distributed the common log entries to all the nodes. That is, I'm not sure this will actively pull anything new from ZK, so maybe this can miss data if the time between the insertion and the sync-replica command is small. (The log messages we have are 3ms apart, so this is quite quick.)
We're also inserting into the Distributed
table in this test. How does that work again? From here:
Each shard can have the internal_replication parameter defined in the config file. If this parameter is set to true, the write operation selects the first healthy replica and writes data to it. Use this if the tables underlying the Distributed table are replicated tables (e.g. any of the Replicated*MergeTree table engines). One of the table replicas will receive the write, and it will be replicated to the other replicas automatically.
Ok, so we're writing to some replica, which does the replication itself. How does that part work? Now from this page:
Replication is asynchronous and multi-master. INSERT queries (as well as ALTER) can be sent to any available server. Data is inserted on the server where the query is run, and then it is copied to the other servers. Because it is asynchronous, recently inserted data appears on the other replicas with some latency. If part of the replicas are not available, the data is written when they become available. If a replica is available, the latency is the amount of time it takes to transfer the block of compressed data over the network.
By default, it looks like the client insert request only waits until the data is on the local node, not replicated. We need to set insert_quorum
to wait until multiple nodes have the data. So following this through, I think the following can happen:
Distributed
table. This selects some replica and inserts it there.ReplicatedMergeTree
table inserts the data, and then returns to the original server.Distributed
engine returns to the client, our test program.ReplicatedMergeTree
asynchronously sends the data to the other node(s).SYSTEM SYNC REPLICA
on node 2. At this point, it seems consistent with this failure that node 2 has not received the ZK command indicating that there is new data to replicate from node 1.SELECT
s some data from node 2, receives nothing, and fails.I'm not saying this is happening, but it appears consistent with the test failure, as far as I can tell. If this is all accurate, I think there are a few things we can do:
SYSTEM FLUSH DISTRIBUTED
on the Distributed
table to tell it to flush the data to its nodes synchronously. Since we're using internal_replication = true
, and relying on the ReplicatedMergeTree
tables themselves to do the replication, I'm not sure whether this means "send data to one replica, waiting for it to ACK" or "send data to all replicas, even if they use internal replication"insert_quorum = 2
, so that the ReplicatedMergeTree
engine actually waits till data resides on both replicasSYSTEM
commands related to replication may be useful. There doesn't appear to be a "flush" variant, but there are many related to managing the replication queues for ReplicatedMergeTree
tables.I also think it might be useful to experiment with these separately, in some kind of stress-testing environment. I'd love a binary that creates the oximeter
replicated database with tweakable configuration parameters (e.g., insert_quorum
), and then tries to insert and fetch a ton of data to smoke out these kinds of issues. It would be nice to know if the above tweaks actually work, and I think waiting for the next test flake is not a great way to learn that with confidence. It will tell is they don't work, but not if they do.
@bnaecker I think your analysis is correct here. However, I have some questions about the purpose of the test and the proposed fixes.
Ideally the test would mimic what we want to use in production. From what I understand we aren't sharding yet, and so shouldn't need to use Distributed
tables yet. I'm not positive of this fact, but got it from reading one of the many disjoint, incomplete clickhouse doc pages. We can instead choose which node to write to ourselves, and the On Cluster
setting should allow async replication of the ReplicatedMergeTree
. We can then read from any single node, and eventually all data will be there. In short, this is the model I have in my head for clickhouse (whether we have distributed tables in use or not)
Let clickhouse asynchronously replicate each ReplicatedMergeTree
With this, we won't be able to read from any node and immediately see the data. We may be able to use SYSTEM FLUSH DISTRIBUTED
in this test as you recommend and fix the flake. We won't want to use this in production because:
Forces ClickHouse to send data to cluster nodes synchronously. If any nodes are unavailable, ClickHouse throws an exception and stops query execution. You can retry the query until it succeeds, which will happen when all nodes are back online.
I think it is the wrong approach to use insert_quorum = 2
. It's unlikely we'd want to use this in production as it would incur higher latency on writes, and adds more complexity to that path, which is a potential for error. Using it in the test doesn't really give us the fidelity we want.
Alternatively, instead of relying on clickhouse to ensure replication is complete, if we want to read from both nodes we can perform a wait_for
in our test. We may have to do that anyway, even with a flush command.
Is this how you were thinking of using distributed clickhouse @bnaecker, or is the model in my head different from yours? I only just started reading the clickhouse docs last week, and so I have some catching up to do.
Oh, also, I plan to take a cut at fixing this today.
Thanks for taking a look @andrewjstone!
Ideally the test would mimic what we want to use in production. From what I understand we aren't sharding yet, and so shouldn't need to use Distributed tables yet
It's correct that we do not shard data right now. I also think distributed tables are still useful. They help simplify the client implementation, and I think will also minimize changes if / when we do shard the data. The idea is that we can always operate on a table with the same name in both the single-node and replicated case -- e.g., oximeter.measurements_cumulativeu64
. That will refer to either a single-node MergeTree
engine table, or a Distributed
engine table, which points at each ReplicatedMergeTree
engine "local" tables. It's true that this is normally done to support sharding, since the distributed table will then automatically merge data across shards. I also thought it would be helpful for the client not to really have to know whether it's talking to single- or multi-node cluster, and be able to run queries the same way.
In practice, the client often needs to know that anyway in order to do things like initialize the database, so it's less of a benefit than I might have originally hoped. But I do think we should remove it only after some more consideration. It very well might make life easier to eschew the Distributed
table completely, if we decide sharding isn't valuable, but it's a good hedge in the meantime.
That said, your mental model is essentially the same as mine. I would like to read and write data on any node, and have that transparently replicated. The Distributed
table seems like a good approach to doing that, especially if we do think we will shard data at some point in the future.
I think it is the wrong approach to use insert_quorum = 2. It's unlikely we'd want to use this in production as it would incur higher latency on writes, and adds more complexity to that path, which is a potential for error. Using it in the test doesn't really give us the fidelity we want
It seems like the quorum setting is a tradeoff to me, but I could be misunderstanding something. Yes, the latency might be higher. It also seems like keeping a quorum of 1 comes with its own risks. For example, from this page:
By default, an INSERT query waits for confirmation of writing the data from only one replica. If the data was successfully written to only one replica and the server with this replica ceases to exist, the stored data will be lost.
That seems potentially bad! It also seems like we could run into either split-brain issues or silent partitions that we can't really diagnose without some other checks (which could be a good idea anyway!). All that might be "OK" for telemetry data, but it still seems plausible to me that paying the additional latency of a quorum > 1 is worth it to avoid these issues.
Alternatively, instead of relying on clickhouse to ensure replication is complete, if we want to read from both nodes we can perform a wait_for in our test
This seems reasonable. Have the test insert on one node, and then retry selecting from both until the data is the same. It might also get us some hard numbers about the latency in ClickHouse's replication.
This particular assertion in the test is essentially testing that ClickHouse actually replicates the data, right? That seems like a bit of an odd test, to be honest, but if we do want to test that, something like your suggestion seems more appropriate, given that we have no clear, unambiguous controls over when ClickHouse actually "completes" the replication. And as you point out, we probably should not rely on those controls anyway.
So far I have not been able to reproduce this locally. test_replicated
takes around 3 minutes to run for me. I shortened the test significantly, so that it runs between 8 and 13 seconds by only including the first two subtests, including the failing one: data_is_replicated_test
. I've run in a loop for about 15 minutes without reproduction. Part of the problem is that this test in particular takes ~7.8 seconds to run, with ~7.1 of those seconds being to initialize the db. This same 7 seconds is repeated throughout many of the subtests and accumulates to the full 3 minute test run. This is completely unreasonable IMO, and it shouldn't take more than a few hundred ms at most to initialize the DB.
There is some obvious inefficiency in that we read and parse the db-init.sql
file into multiple independent statements and send a post for each one, but I don't think that's the whole issue. Each statement execution has a lot of other things going on and I haven't figured it all out yet. Ideally, though, we'd be able to just send all statements in a single post and let them run together.
There is some obvious inefficiency in that we read and parse the db-init.sql file into multiple independent statements and send a post for each one, but I don't think that's the whole issue. Each statement execution has a lot of other things going on and I haven't figured it all out yet. Ideally, though, we'd be able to just send all statements in a single post and let them run together.
Unfortunately, the ClickHouse HTTP interface does not support multiple statements. That's the point of the shitty statement splitting we do in the client today. I'd love to understand how much of the time is on the client vs the server side.
Unfortunately, the ClickHouse HTTP interface does not support multiple statements. That's the point of the shitty statement splitting we do in the client today. I'd love to understand how much of the time is on the client vs the server side.
I kinda figured. Otherwise you wouldn't have written it that way. Sorry if my comment sounded overly harsh.
Based on the facts that the single node tests take ~4s to run, and that removing the Engine = Distributed(...
and related create table statements from replicated/db-init.sql
didn't change much in the timing of db-init, I think most of the cost is in the replicated table setup, which is strictly necessary. I'm going to look into the config to see if there's anything I can play around with there.
Just as an added datapoint: I looked at the latest run on main and test_replicated
takes 258s to run. The only tests that take longer are saga integration tests, and nothing else even comes close. I don't believe the value of this test is worth the >4 minute run in its current state, and perhaps it should be shrunk to only run those tests that are strictly related to being distributed.
For the future, when we integrate the automated configuration generation as implemented in clickward
, we may be able to get away with parallel test runs which would speed this up dramatically.
All that being said, the test that is specifically failing is one of the replicated ones. I believe I know how to fix it. Yet, without being able to reproduce the failure, I'm hard pressed to mark it as "fixed" with my changes.
@karencfv Now that we have the clickward based replicated tests that act as a superset of functionality of the data_is_replicated_test
, maybe we can just go ahead and remove that test. It's the only one failing with a race here.
@andrewjstone Totally agree! We don't need that test anymore
Failure: https://github.com/oxidecomputer/omicron/pull/5163/checks?check_run_id=22103712249 I don't think this PR has anything in it that would cause this failure.
Failure details: