Closed anoopj closed 6 years ago
Are tables partitioned? What was the planning time for this query in both versions?
None of the tables are partitioned. The query planning time (totalPlanningTime) was actually lower in 0.206. It was 93 ms for 0.206 vs 420 ms for 0172. So does not seem to be the issue.
The queries are blocked more often in 0.206 than 0.172. totalBlockedTime for 0.172 was 19 hours while it was 1.77 days for 0.206. So this could explain the slowness.
How do I find out what is the reason why the queries are blocked? It could be caused by straggler tasks or some sort of concurrency or scheduling issue.
@nezihyigitbasi and @dain I'm hearing this same issue from folks at other companies, not just us at Amazon. I managed to spend some time with @anoopj on this and could pinpoint what was the cause but I did see that the jitter for split completion time was extremely high (in fact, most of the query latency differences seem to come down to a hand full of 'straggler' splits that would prevent a stage from completing). @anoopj also mentioned to me some changes in memory management in recent releases, though GC didn't seem to be a red flag. I'm not sure what could be causing a small number of splits to consistently take longer on 206 vs earlier versions. I almost feels like the kind of issue that causes other frameworks to use things like speculative execution (not saying that would be a good idea here, just trying to describe the qualitative issue I think we are seeing).
I deleted the misleading comments, so we can focus on the core issue.
Looking at the explain plan, it appears that the issue is not in CPU usage, but in latency. There are lots of issues that can cause latency for a single query (e.g., skew), but systemic latency issues are typically caused by either not enough parallelism, scheduling unevenness, or latency bugs in communication layers.
For the first one, do you have roughly the same number of drivers in each stage? Specifically, are both versions using the same local parallelism? Also, are both queries operating on exactly the same data? If you have less files (or better compression) in one version it can result in less parallelism. If any of that happened, make change the config/data to be consistent and rerun the trial.
For, the second one, scheduling, I look at the normal query UI to figure out which tasks are "slower", and why. Did they get more splits? If so, the task queues could be too large, or there is communication lag causing observations to go stale. Did they have more input or output rows (or bytes)? I don't expect this to be the issue, but it is worth checking off.
For the last one, latency bugs, I use the "splits" view in the UI. For each task it shows: created, first split started, last split started, last split finished, and finished. You should see tight banding across the stage, where basically all tasks are created at the same time, all splits start together, and end together. Depending on the size of the input data, you should see the scan stages go to "last split started" instantly, and generally the distance between "last split ended" and "general" smalish... IIRC it is the time taken to drain the output data. Basically, the picture should explain where the latency comes from.
@dain thanks for the pointers. Appreciate that.
Do you have roughly the same number of drivers in each stage?
The number of drivers are roughly the same. For some table scans, there are small differences (less than 5%) in the number of drivers. For the intermediate stages, it's exactly 60. (task.concurrency * # of nodes)
Also, are both queries operating on exactly the same data?
Both queries are operating on exactly the same Parquet dataset on S3.
Scheduling
There is not much jitter between the tasks in the execution time or the I/O data volume. For every task, the CPU time is lower in 0.206 (which means the operators in 0.206 run faster and there is better pushdown of predicates), but the wall time is consistently higher. The stages are showing a higher blocked time, so this could be why the wall time is higher. What would cause the stages to block?
Interestingly, the cumulative memory reported in the UI is consistently higher in 0.172. This makes me think that the query is more starved for memory in 0.206. For instance, stage# 5 reported a cumulative memory of 27.4 GB in 0.172 while it was 6.08 GB in 0.206. Is it possible that the blocked time is due to memory pressure?
I see that there were some recent changes to the Presto memory pool to eliminate the reserved system memory pool. We only run one query at a time, so Presto is configured to have a large reserved pool (around 15 GB) and queries are running in the reserved pool.
Latency bug in communication layers.
The "splits" view shows a tighter banding across the tasks in 0.172. For instance, in stage# 5, some of the tasks are running for much longer in 0.206 than the others. The input and output sizes are the same across tasks.
I'm attaching the split view comparison between these two versions. Please note that 0.172 UI is inconsistent with the explain analyze. The UI seems to merge the last two stages into one. So when you compare, stage# 5 in 0.206 is actually stage# 4 in 0.172 and so on.
Splits in 0.172
Splits in 0.206
There is something strange going on. The scan stages are taking much longer. For example, in 0.206, It looks like the scan is taking like 45s, but in 0.172 it was taking like 10s. Heck it takes like 20s to get all the splits scheduled.
Is this query able to max out the cluster in 0.172? I would check if you have the same number of task executor threads on 0.206. If the number is different, then that is likely the issue. if it is not, try cranking that number up to see if you can get the latency down. If you can, then I would guess there is something wrong in the scheduler. If you can't, then I would expect the data to be backing up somewhere in the output buffer between two stages. When you find that, look in the stage above that one, and figure why it is "slow". I typically look for the local exchange buffers, and which operator is using a lot of CPU. The "slow" place in the query causes everything below it to slow down, so we don't run out of memory.
I would check if you have the same number of task executor threads on 0.206.
@dain I assume you meant task.max-worker-threads. It's set to 32 (2 * Number of vCPUs) in both clusters. The other concurrency settings: task.concurrency (set to 2) and query.initial-hash-partitions (set to 30) are the identical across both clusters.
# On a 0.172 worker
$ grep task.max-worker-threads /mnt/var/log/presto/server.log
2018-08-22T06:26:32.673Z INFO main Bootstrap task.max-worker-threads 32 32
# On a 0.206 worker
$ grep task.max-worker-threads /mnt/var/log/presto/server.log
2018-08-22T06:25:40.922Z INFO main Bootstrap task.max-worker-threads 32 32
0.172 cluster is showing better CPU utilization than 0.206.
0.172 CPU usage during query execution
0.206 CPU usage during query execution
I cranked up the worker thread count from 32 to 64 on the 0.206 cluster and the query execution dropped from 4:35 to 3:24. It is still way higher than the 1:58 taken by 0.172. I then tried cranking up the worker threads to 96, and the time went up to 3:46, possibly due to context switches.
You can see that the CPU utilization went up when I increased the worker threads from 32 to 64.
0.206 CPU usage with worker thread count of 32, 64 and 96
So that points to a back-pressure kicking in. Can you find the place that is blocking up the query? Look at the operators after the last full buffer.
Dain --
Back-pressure is a possibility, and I will dive deeper into this. But I think some sort of scheduling issue is more likely. This is because during query execution, some stages that have zero to very little (<10 MB) output buffering, but all of the splits for that stage are not scheduled even though there are enough splits available in the cluster and all of it's input stages are completed.
I made a screencast of the live query plan at https://goo.gl/pTWKV6 . For instance, if you forward to 3:33, you can see that all of the input stages of stage 4 has completed and there is no output buffering. But rest of the splits for stage 4 are not scheduled yet. If you wait a few seconds, you can see that stage 4 eventually processes 180 splits.
Do you think some sort of back-pressure could explain this behavior or does this point to some sort of regression in the scheduler?
I looked through the recent commit history of NodeScheduler, but can't find any recent commit that could explain this.
The scans have all completed by 0:39 of the video, and at that point everything is running, so even if there was a split scheduling issue, it only effected the first 40s of the video. After this, all of the data is in memory, and everything is running.
One thing to note is the joins are skewed. Looking at 1:06, stage 7 only has 40 splits running and stage 6 only has 7. I would look at the full query info for those stages to see which operator is takeing all of the wall time, or alternatively, is an operator that we expect to be running 100% of the time blocking a lot (e.g., the join operator should be 100% busy). Also, is all of the data getting send to a single join operator instance?
Any updates?
@dain Thanks for the followup. I was away for a couple of days.
What I meant by split scheduling was the scheduling of the splits for the non-leaf stages. In 0.172, all the splits for the intermediate (non-leaf) stages are running right at the start of the query. Whereas in 0.206, the some of the splits for the intermediate stages are run much later. (As you saw from the video at https://goo.gl/pTWKV6)
What decides when the splits for the intermediate stage are run? My understanding is that Presto scheduler is only involved in the scheduling of splits of only leaf stages. For intermediate stages, is there a policy or are they run as and when the upstream input data is available?
There does not seem to be any skews. From the tasks reported in the UI, all tasks in stage7 take in an equal amount of input data.
Also, is all of the data getting send to a single join operator instance?
How do I verify this? From the UI, all tasks seem to be getting an equal amount of data.
I would look at the full query info for those stages to see which operator is taking all of the wall time, or alternatively, is an operator that we expect to be running 100% of the time blocking a lot .
For stage 7, most of the wall time is taken by the HashBuilderOperator and the LookupJoinOperator. The LookUpJoinOperator gets blocked too.
For stage 6 and 5, the wall time was taken up by HashBuilderOperator, ExchangeOperator and then the LookupJoinOperator. Also, the ExchangeOperator seems to take way wall time in 0.206 than 0.172.
I'm attaching a couple of stages from both versions for comparison.
Stage 7 in 0.206
Stage 7 in 0.172
Stage 6 in 0.206
Stage 6 in 0.172
What I meant by split scheduling was the scheduling of the splits for the non-leaf stages. In 0.172, all the splits for the intermediate (non-leaf) stages are running right at the start of the query. Whereas in 0.206, the some of the splits for the intermediate stages are run much later. (As you saw from the video at https://goo.gl/pTWKV6)
The timelines posted above show that all intermediate splits/tasks start immediately. The video show that all stages are fully scheduled and running (that is why they are light green).
What decides when the splits for the intermediate stage are run? My understanding is that Presto scheduler is only involved in the scheduling of splits of only leaf stages. For intermediate stages, is there a policy or are they run as and when the upstream input data is available?
There are two scheduler policies. The default policy all-at-once
, immediately schedules all intermediate and leaf splits. The phased
policy starts the leaf and intermediate tasks in phases so only what is needed to make progress is running. In the timeline and the video, everything starts quickly.
There does not seem to be any skews. From the tasks reported in the UI, all tasks in stage7 take in an equal amount of input data.
Maybe skew is the wrong term. If you look at the video, you will see that stages 6 and 7 most of the tasks finish immediately. This says to me that you are only joining to a few distinct values, which means that only a few cores can do the joining, which typically means long wall time.
Also, is all of the data getting send to a single join operator instance?
How do I verify this? From the UI, all tasks seem to be getting an equal amount of data.
Some times you can see it in the task list by looking at row counts to tasks each stage. The issue is the UI aggregates stuff, so sometimes the real issue is hidden. At some point, I just download the query json document and read it. I also sometimes download all of the task json documents from every worker and see what happened.
I would look at the full query info for those stages to see which operator is taking all of the wall time, or alternatively, is an operator that we expect to be running 100% of the time blocking a lot .
For stage 7, most of the wall time is taken by the HashBuilderOperator and the LookupJoinOperator. The LookUpJoinOperator gets blocked too. For stage 6 and 5, the wall time was taken up by HashBuilderOperator, ExchangeOperator and then the LookupJoinOperator. Also, the ExchangeOperator seems to take way wall time in 0.206 than 0.172.
In these cases, I would guess the wall time is long because it is not getting data fast enough.... it is simply waiting. You can see that in the task level json documents. Each driver is single threaded, so you can reason about the ratio between scheduled time and blocked time.
The distribution across all tasks is important. You need the task json documents for that.
The timelines posted above show that all intermediate splits/tasks start immediately.
This may be a noob question, but the timelines show that the tasks start immediately, but not the splits, right? During the execution, if you look an intermediate stage (like 7 or 6), the sum of the running splits and finished splits is not the same as the final number of splits for that stage after the query is completed.
For instance, if you go to 0:40 in the video, stage 7 has 40 splits running and 60 are finished. Please note that the input stages of 7 has completed already. Eventually stage 7 does process 180 splits. So why are not all of them either not running or finished at this point?
@dain
There does not seem to be skews in the join. From the task JSONs and it seems like the input and output data sizes for LookupJoin operator in all tasks are pretty much the same.
Stage 7 tasks take over a minute of CPU and wall time. A couple of observations about this stage:
Bulk of the CPU and wall time is in pipeline# 2, which does the join, project and partitioning the output. However, if you add up the CPU time of the operators, it does not add up to the pipeline's CPU time. So it's not clear where the CPU time is spent.
Most of the wall time is spent in blocked.
A couple of sample task JSONs for reference
Stage6 sample task Stage7 sample task
We are using the default scheduler policy of all-at-once
and I can repro this on a Presto 0.208 cluster.
Another possibility is that this might be related to a performance issue with spill to disk:
The slow-down happens only on large datasets (TPCH scale 1000 onwards) The same query is about 22% faster on scale 10 and 100.
As I mentioned in the previous comment, the CPU time spent by operators don't add up to the time spent by the task.
The query or task JSON does not have any stats on spilling. But I did a jstack
on a worker node and see that some of the threads are indeed spilling to disk.
How do we verify if there is a spill performance change in the new versions of Presto? Also, is there a way to verify this using a micro-benchmark?
I poked around and it looks like spill support was added to HashBuilderOperator
after 0.172. So a wild guess is that the hash join triggers a spill to disk on 0.20x because the memory usage exceeded experimental.memory-revoking-threshold
while the older version kept in RAM (but the data still fit into the reserved memory pool).
Does that make sense?
Is spill to disk enabled in your configuration?
I have been talking to Sameer on Slack and he said "The root cause was the default number of per user max file descriptors allowed by Linux. For certain hosts this number exceeded for the truststore file." They are still testing the final changes.
Is spill to disk enabled in your configuration?
That is correct. Spill to disk enabled in 0.172 and 0.20x clusters we had and configured to write to an EBS volume. The older Presto version did spill to disk only for aggregations while the new version does it for joins and aggregations.
I have been talking to Sameer on Slack and he said "The root cause was the default number of per user max file descriptors allowed by Linux.[...]
Sameer is from a different team at Amazon and the issues they face are unrelated to this issue. We are not anywhere close to hitting the file descriptor limit.
@anoopj lol :) Amazon is so big
cc @nezihyigitbasi as recommended by Dain.
I think I'm close to finding out the root cause of the slowness. I believe it is related to when a query is getting promoted to the reserve pool. In my test, there can only be one query at a time in the cluster. So ideally we want a large reserve pool, a tiny general pool and want the query to be promoted to the reserve pool at the earliest.
In my initial tests, I had a reserve pool of about 15 GB and a general pool of 2 Gigs. After a few seconds of starting the query, the Presto UI says the query is running in reserved pool, but I'm not really sure if the query is actually running in the reserved pool in the workers.
So then I changed the settings to further shrink the general pool, to force the query to run in reserve pool. This time the reserve pool has 17 GB and the general pool has only a small megabytes. This time the query runs 4X faster than with the previous setting!
What decides when a query is promoted to the reserve pool? Is it a decision made at a cluster-level by the coordinator or is it a local decision?
I saw some code in ClusterMemoryManager, but was not sure if I was looking at the right place.
@anoopj thanks for investigating this. This is interesting, so please keep us posted.
In my test, there can only be one query at a time in the cluster. So ideally we want a large reserve pool, a tiny general pool and want the query to be promoted to the reserve pool at the earliest.
Why not just a large general pool? Reserved pool is used to handle the case when general pool is full on some worker to prevent deadlocks and it will eventually be removed.
the Presto UI says the query is running in reserved pool, but I'm not really sure if the query is actually running in the reserved pool in the workers.
When you see that in the UI it should be the case on the workers too. The links to workers on the query details page (in the list of tasks) will show which queries run on the reserved pool on a particular worker, you can use those worker pages to confirm.
What decides when a query is promoted to the reserve pool? Is it a decision made at a cluster-level by the coordinator or is it a local decision?
It's a coordinator level decision, and that decision is sent to the workers to assign a query to the reserved pool on each worker. The logic is here. The biggest query is moved to the reserved pool if there are no queries running in the reserved pool and some worker has less than zero free bytes in its general pool (ran out of memory -- this is the generalPool.getBlockedNodes() > 0
condition in the code).
@nezihyigitbasi Thanks for the response.
Why not just a large general pool? Reserved pool is used to handle the case when general pool is full on some worker to prevent deadlocks and it will eventually be removed.
We can't do that because currently Presto conflates the reserve pool size and the size of the largest query that can be run with the same setting. (query.max-total-memory-per-node
in the newer versions) If we set this to too low, general pool will be large, but it will prevent from large queries be run, which defeats the purpose. Presto's plan to get rid of the reserve pool in future releases would be a great improvement, but until then, we would need to do something like this to pin the large query to the reserve pool. Please correct me if I'm wrong - we'd love to be not doing this.
The biggest query is moved to the reserved pool if there are no queries running in the reserved pool and some worker has less than zero free bytes in its general pool.
That makes sense and I verified that the query gets moved to the reserve pool in the worker nodes within a few seconds after the query starts.
But if I shrink the general pool to be close to 0, the query runs four times faster. ie. in the same cluster, if I change the below setting:
query.max-total-memory-per-node = 16.7GB
to:
query.max-total-memory-per-node = 16.8GB
, the query latency drops by a factor of 4!
With this setting change, I'm shrinking the general pool from 100MB to 0 or one byte and that made a big difference. So clearly something memory related is going on here and I'd appreciate your thoughts. Please note that we have spill to disk enabled.
@nezihyigitbasi
Please let me know your thoughts.
With the info I have my guess is spilling may be causing this. I recommend you to monitor the spill activity across the cluster during these tests.
the query latency drops by a factor of 4!
For this particular experiment, do you observe any spilling for the slow run? And if yes, do you observe more spilling in the slow run than the fast run?
Again, for this particular experiment, for the slow run, how long does it take for the query to get promoted from general pool to the reserved pool?
I'm shrinking the general pool from 100MB to 0 or one byte and that made a big difference.
Since your general pool is small (~100MB) it's possible that aggregation operators start running on some workers and start processing data, and since the general pool is small they start spilling. When you have a very tiny general pool (0/1 byte) the query gets immediately moved to the reserved pool before aggregations start running and spilling, and after that point aggregations won't spill at all (or much) when they run in the much larger reserved pool. To confirm whether that's the case you should monitor the spill activity during your tests.
Since your general pool is small (~100MB) it's possible that aggregation operators start running on some workers and start processing data, and since the general pool is small they start spilling. When you have a very tiny general pool (0/1 byte) the query gets immediately moved to the reserved pool before aggregations start running and spilling, and after that point aggregations won't spill at all (or much) when they run in the much larger reserved pool.
@nezihyigitbasi that totally makes sense. My thinking as well. the proper fix would be to allocate revocable memory from the reserved pool directly -- https://github.com/prestodb/presto/issues/10512 @anoopj would you take a stab at that?
Thank you both for the response. I think it makes sense and in line with my guess also.
Quick question: What is the recommended way to quantitatively measure the amount of spill to disk for a query? jstack
showed that spill to disk was indeed happening, but I'd like a way to quantify that.
@anoopj i don't think there is such a metric today. You can recover old PR https://github.com/prestodb/presto/pull/8010 to get you going or get some inspiration from the comment https://github.com/prestodb/presto/pull/8010#issuecomment-314844081 explaining why that PR was not merged. Having such a metric would be generally valuable.
I added the spill metric all the way from the operator to the Web UI and reran the tests.
I can confirm that the slowness is caused due to spilling. Even though the query gets promoted from the general pool to the large reserve pool, the aggregations have already started running and spilling. The query above spills over 281 GB of data.
I then disabled the reserve pool and allocated most of the heap to the general pool. For disabling the reserve pool, I used the experimental setting (experimental.reserved-pool-enabled
) that Nezih recently added. This time the query has a larger pool to work with and spills 0 bytes.
As a result, the query runs 4x faster.
Can you submit a PR for the UI changes?
@findepi, we should also add spill to the explain analyze output
I can submit a PR to add spill to the UI and explain analyze output also.
Resolving, since we found the root cause.
@anoopj nice investigation & thanks for the follow up.
I can submit a PR to add spill to the UI and explain analyze output also.
@anoopj if you have problems rebasing your changes to current master, you can post a PR based on 0.206 (or whatever you current have) and then it will be easier to move this forward.
@findepi
Piotr - I can rebase from the current master, just been busy with other projects. I hope to send out a PR in the next few days.
Created pull request https://github.com/prestodb/presto/pull/11910 to add the spill stats to the web UI and query summary.
I was recently benchmarking Presto 0.206 vs 0.172. The tests are run on Parquet datasets stored on S3.
We found that Presto 0.206 was generally faster on smaller datasets, there were some significant performance regressions on larger datasets. The CPU time reported by EXPLAIN ANALYZE was lower in 0.206 than 0.172, but the wall time was much longer.
This possibly indicates either stragglers or some sort of scheduling bug that adversely affects parallelism. Note that the concurrency settings like task.concurrency are the same in both clusters.
For instance, on the TPCH scale 1000 dataset, query#7 slowed down by a factor of 2x in wall time. The query was:
I compared the output of EXPLAIN ANALYZE from both versions of Presto and cannot find anything that could explain this. Here are some observations:
References