pingcap / tidb

TiDB is an open-source, cloud-native, distributed, MySQL-Compatible database for elastic scale and real-time analytics. Try AI-powered Chat2Query free at : https://www.pingcap.com/tidb-serverless/
https://pingcap.com
Apache License 2.0
36.9k stars 5.81k forks source link

Statistics Tech Debt #55043

Open Rustin170506 opened 1 month ago

Rustin170506 commented 1 month ago

Tech Debt

In this issue, I will record all tech debts I found in the TiDB statistics module.

Code duplication

The worst and most serious technical debt in the statistics module is code redundancy, and the following implementation pattern is used almost everywhere there is concurrent processing:

  1. Keep single-threaded code untouched.
  2. Directly implement another concurrent version of the exact same code.

This pattern creates a lot of problems, we have a few issues that need to be fixed twice, and code redundancy is serious. Theoretically, we just need to think of single-threaded as a special case of a multi-threaded implementation, and we shouldn't be copying and pasting code.

Here are the relevant modules that have this problem:

  1. Result Stored Procedures for Partitioned Tables
  2. Init Stats
  3. Merge Global Stats and so on...

Too many variables

image

We have a lot of variables related to the collection of statistics, most of which were introduced when concurrency support was introduced, and they have a variety of names that are very difficult to understand. It is also not clear from the documentation how these variables affect the system. Even some of these variables are actually related to each other, which makes it very challenging for users to work with statistical information. We need to:

  1. check that these variables are actually in effect.
  2. explain how these variables affect the system.
  3. ensure that this information can be found in the documentation.
Rustin170506 commented 1 month ago

NCRMTA1: Surprise analyze-partition-concurrency-quota

Abbreviation: NCRMTA -> Nobody can really master TiDB analyze

The whole document is about how we can improve in the future. So please do not take it as criticism, even though I call it that, nobody can really master TiDB analyze.

Background

In this PR, https://github.com/pingcap/tidb/pull/55046, I aimed to delete some duplicated code related to saving stats results. However, we were surprised to discover a new configuration called analyze-partition-concurrency-quota. There is no documentation or explanation for this configuration, but it does impact how we store stats results. In this document, I will explain why this configuration exists, how it works, and how to address it.

Concurrently saving analysis results

When analyzing partitioned tables, it is important to concurrently analyze their partitions instead of analyzing them one by one. The analysis process dataflow looks like this: image

As you can see, we will spawn some workers to analyze these partitions concurrently, then we will collect all analysis results in the result handler. In the result handler, we also need to save the statistics concurrently; otherwise, all calculated statistics will heap in memory and queue for writing.

image

As you can see, we spawn some saving workers to save the result table by table. This is where the analyze-partition-concurrency-quota comes into play.

analyze-partition-concurrency-quota

This is a configuration for the TiDB server. Its default value is 16.

tidb_servers:
  - host: 10.0.1.14
    config:
      performance.analyze-partition-concurrency-quota: 16

The name is somewhat misleading, as it suggests that this configuration controls the concurrency of partition analysis. In reality, it only governs a portion of the process, specifically the saving phase. The way it affects the analysis process is by controlling how many sessions you can use for statistics collection. These sessions can be used to execute internal SQL while analyzing the table.

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
    return nil, err
}
subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
    subCtxs2[i] = analyzeCtxs[i]
}
dom.SetupAnalyzeExec(subCtxs2)

While bootstrapping the domain, we read this configuration and initialize dedicated sessions for statistics collection. dom.SetupAnalyzeExec(subCtxs2) For each domain, we share these sessions for every analyze statement. We will try to fetch sessions from this protected structure by calling the FetchAnalyzeExec function.

analyzeMu struct {
    sync.Mutex
    sctxs map[sessionctx.Context]bool
}
// FetchAnalyzeExec gets needed exec for analyze
func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context {
    if need < 1 {
       return nil
    }
    count := 0
    r := make([]sessionctx.Context, 0)
    do.analyzeMu.Lock()
    defer do.analyzeMu.Unlock()
    for sctx, used := range do.analyzeMu.sctxs {
       if used {
          continue
       }
       r = append(r, sctx)
       do.analyzeMu.sctxs[sctx] = true
       count++
       if count >= need {
          break
       }
    }
    return r
}

If no sessions are available, we return an empty slice. The next question is: what happens when no sessions are available? The answer is we fall back to using a single thread to save the analysis results one by one.

if partitionStatsConcurrency > 1 {
    subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
    ...
    if len(subSctxs) > 0 {
       sessionCount := len(subSctxs)
       logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
       defer func() {
          dom.ReleaseAnalyzeExec(subSctxs)
       }()
       return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
    }
}
logutil.BgLogger().Info("use single session to save analyze results")
failpoint.Inject("handleResultsErrorSingleThreadPanic", nil)
subSctxs := []sessionctx.Context{e.Ctx()}
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)

Before I demonstrate the problem here, let's look at another session variable first.

tidb_analyze_partition_concurrency

This is a session variable. As you can see, its name is very similar to the above configuration. The same confusing name, the same function. So this variable can be set at the global level or session level. The way it works like this:

partitionStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency
// the concurrency of handleResultsError cannot be more than partitionStatsConcurrency
partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency)
if partitionStatsConcurrency > 1 {
    subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
    ...
    if len(subSctxs) > 0 {
       sessionCount := len(subSctxs)
       logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
       defer func() {
          dom.ReleaseAnalyzeExec(subSctxs)
       }()
       return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
    }
}
logutil.BgLogger().Info("use single session to save analyze results")
failpoint.Inject("handleResultsErrorSingleThreadPanic", nil)
subSctxs := []sessionctx.Context{e.Ctx()}
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)

So basically, we think tidb_analyze_partition_concurrency has a higher priority, and we will use it to determine if we need to spawn multiple workers to save the analysis results. Then we use it to fetch dedicated sessions to start the workers. However, if we cannot get any available sessions, we fall back to using a single thread to save the results. This means that once one table takes all the sessions, other analysis statements will have to save their results one by one. Even after the first analysis, which used all the sessions, is completed, we cannot reuse sessions or increase concurrency for other statements. Since we set up the workers at the beginning of the analysis statement, there is no ability to change the concurrency dynamically.

What is the problem?

  1. There is no document to explain how it works.
    • If you click the original PR that introduced this configuration, you will find that there is no description for it at all. It simply states that it introduces a new approach to saving the analysis results concurrently. So nobody was aware of it until I submitted the PR to clean up some code.
  2. The analyze-partition-concurrency-quota makes tidb_analyze_partition_concurrency unusable.
    • From the current implementation, the actual concurrency depends on how many available analysis sessions remain. For example, on a single TiDB node, if you send three analysis statements to it, the maximum concurrency for saving results is 18. Even if you change tidb_analyze_partition_concurrency to 200, it still remains at 18.
    • Another reason I think it is unusable is that, since it is a server configuration, you need to restart the TiDB instance for the changes to take effect.
  3. The concurrent code and single-thread code are duplicates.
    • We directly copied the same code twice. This introduces many issues. For example, we need to fix the same bug twice, read the same code twice, change the same code twice, and even check which code is being used in users’ clusters before starting debugging.
    • My PR is a vivid example:
    • We forget to write the error after it gets killed compared to the single thread version.

image

  1. The lack of tests

    • We can realize that according to the above analysis. tidb_analyze_partition_concurrency The maximum value of tidb_analyze_partition_concurrency can't be greater than analyze-partition-concurrency-quota. Otherwise, it will be meaningless.
    • We do have a code to check it. But It doesn't work. The reason is we missed Type here.
{
    Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzePartitionConcurrency, Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10),
    MinValue: 1, MaxValue: uint64(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota), SetSession: func(s *SessionVars, val string) error {
       s.AnalyzePartitionConcurrency = int(TidbOptInt64(val, DefTiDBAnalyzePartitionConcurrency))
       return nil
    },
},

// ValidateFromType provides automatic validation based on the SysVar's type
func (sv *SysVar) ValidateFromType(vars *SessionVars, value string, scope ScopeFlag) (string, error) {
    // Some sysvars in TiDB have a special behavior where the empty string means
    // "use the config file value". This needs to be cleaned up once the behavior
    // for instance variables is determined.
    if value == "" && ((sv.AllowEmpty && scope == ScopeSession) || sv.AllowEmptyAll) {
       return value, nil
    }
    // Provide validation using the SysVar struct
    switch sv.Type {
    case TypeUnsigned:
       return sv.checkUInt64SystemVar(value, vars)
    case TypeInt:
       return sv.checkInt64SystemVar(value, vars)
    case TypeBool:
       return sv.checkBoolSystemVar(value, vars)
    case TypeFloat:
       return sv.checkFloatSystemVar(value, vars)
    case TypeEnum:
       return sv.checkEnumSystemVar(value, vars)
    case TypeTime:
       return sv.checkTimeSystemVar(value, vars)
    case TypeDuration:
       return sv.checkDurationSystemVar(value, vars)
    }
    return value, nil // typeString
}

What can we learn from it?

  1. Action: Please add a detailed description to your PR body. My personal opinion is that we should write the design documents first, before we submit the PR.
  2. Action: Please update the document when introducing any configurations or variables. Submit the docs PR before merging the code PR and reviewers should be aware of it as well.
  3. Action: Do not copy and paste code when trying to improve something. Otherwise, you end up improving one thing but messing up many others.
  4. Action: Do the basic test before your quest for code review.
  5. Action: Add useful comments. (hack points)

What is worse?

  1. It has not been properly noticed or evolved It's already very messy, so what could be worse? So after iterating on this approach for years, we ended up not using these sessions for analysis at all. The original purpose of those sessions was to execute SQL to save analysis results. It did work in the past, at least when it was introduced. image However, after we improved the SaveTableStatsToStorage API, we no longer use sessions as a parameter.

    err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error {
        statsVer, err = SaveTableStatsToStorage(sctx, results, analyzeSnapshot)
        return err
    }, util.FlagWrapTxn)

    So in the new API, we will have another unbound session pool to execute this SQL. So we don't use dedicated analysis sessions anymore. From the code, we also pass the session context to finishJobWithLog, which is also very confusing. In finishJobWithLog we call FinishAnalyzeJob:

    exec := sctx.GetRestrictedSQLExecutor()
    ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
    _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)

    You can tell from sqlexec.ExecOptionUseSessionPool that we don't use this session to execute the update job history SQL. To conclude, it means that we don't use it either to save analysis results or to store job history, it has no effect. So the only effect of it is that it will be a hard limitation to the saving concurrency. And if you want to change it, users have to restart the server.

  2. This design is self-contradictory This point is not so easy to understand. Why do I say this design is self-contradictory? If we limit the number of sessions to 16 to control the resources used by analyze, the actual result may end up being the opposite of the desired effect. When analyzing a partitioned table, we first perform concurrent scans from TiKV, and after obtaining these scanned samples, we calculate and build statistical information. Only after these steps are completed, we write the results to the system tables, as discussed in this document. Here is the contradiction: since our concurrency is limited by this configuration, the writing process becomes very slow, and we have to write these results one by one. As a result, the computed results stay in memory longer, using more memory. The smaller this value is, and the more partitioned tables we analyze, the more pronounced this phenomenon becomes. The changes in https://github.com/pingcap/tidb/pull/47960/files alleviated this issue to some extent because the number of results we can cache in the result channel has been reduced.

How to get rid of it?

The reason I believe we should delete it is that it makes tidb_analyze_partition_concurrency useless. If you really want to improve the analysis performance for partitioned tables, you would need to change this configuration and restart the cluster. This is unacceptable for some users. A value of 16 is quite small compared to the number of partitions in a partitioned table. So I think we should keep only tidb_analyze_partition_concurrency, with a default value of 2. Even if users issue many ANALYZE statements to the TiDB instance, it shouldn’t cause significant issues. The saving process is relatively simple.