facebookincubator / velox

A C++ vectorized database acceleration library aimed to optimizing query engines and data processing systems.
https://velox-lib.io/
Apache License 2.0
3.28k stars 1.08k forks source link

kMaxCompiledRegexes leads to non-deterministic SparkSQL regexp_replace behavior #8438

Open codyschierbeck opened 5 months ago

codyschierbeck commented 5 months ago

Description

The context of this issue is re-introducing SparkSQL regexp_replace. The current PR to do this is #8333. This issue was discovered after @mbasmanova 's comment on that PR "Would you check if documentation needs updating to clarify the behavior for input NULLs and explain when result might be NULL?" made me realize that there is zero reason to support null arguments in SparkSQL regexp_replace. We do still need to support null return values (to the best of my knowledge) since we throw exceptions. So, the correct call function is 'bool call'.

Summary

The 3 main changes I am proposing are:

  1. Remove the VELOX_USER_CHECK_LT for cache size
  2. Change getCachedRegex to return a std::shared_ptr instead of a raw pointer to avoid seg faults scope closing
  3. Use a SimpleLRUCache in place of a folly:F14FastMap to allow for easy cache eviction.

Common Eval vs Simplified Eval add regexes/patterns to the patternCache_ in different orders, and when the cache size is compared to kMaxCompiledRegexes in https://github.com/facebookincubator/velox/blob/c4ef9edc5ac207f19fd0a9dae27232e82487d810/velox/functions/sparksql/RegexFunctions.cpp#L197-L207 exceptions are thrown at different rows per each Eval method, breaking the fuzzer.

Details

This is because the fuzzer succeeds when a common and simplified eval execution produce the same output. Figuring out this fuzzer bug was the first time that I've had to dive deep into expression evaluation, so I am still new to it. But, from what I understand, common eval will find common sub-expressions and "fold" them to reduce repeated work.

In the context of regexp_replace, this means that even if a query has a set of unique regexes > kMaxCompiledRegexes, the different expression evaluation strategies will provide these regexes in a different order. This leads to non-deterministic behavior in some corner cases of RegexpReplace execution. In the example I will provide below, common eval throws an error during row 16, whereas simplified eval throws during row 87 and 98.

I discovered this while trying to evaluate my 'bool call' implementation against the fuzzer. I ran 4, 30 minute duration, simultaneous instances of the fuzzer with regexp_replace given 1,000 tickets. 2/3 of the fuzzer calls failed about 25 minutes into execution.

In one such case, the input vector is as follows:

I0118 12:15:31.541563 2947888 ExpressionVerifier.cpp:31] 2 vectors as input:
I0118 12:15:31.541568 2947888 ExpressionVerifier.cpp:33]        [FLAT VARCHAR: 100 elements, 10 nulls]
I0118 12:15:31.541581 2947888 ExpressionVerifier.cpp:33]        [DICTIONARY VARBINARY: 100 elements, 16 nulls], [DICTIONARY VARBINARY: 100 elements, 12 nulls], [DICTIONARY VARBINARY: 100 elements, 8 nulls], [FLAT VARBINARY: 100 elements, 7 nulls]
I0118 12:15:31.541592 2947888 ExpressionVerifier.cpp:36] RowVector contents (ROW<c0:VARCHAR,c1:VARBINARY>):

I believe this is saying that is a two column rowVector of a flat VARCHAR and a 3-nested DICTIONARY that maps to VARBINARY.

It ran the following query against this input:

"try"("translate"("regexp_replace"('MiB$?;8Y"%,qT^!vqWU(J<|*r]>LF%~5#)X^ZmFWK{JITh-[nfXZx4', "c0", "c0"), 'noFSI%o/dE}/mm0h,WhvmQy3IeH^84:', "regexp_replace"("md5"("c1"), "md5"("c1"), '-:VmR@n9?is;6[Ty!Q{8DI^h86C')))

but the error occurs in

regexp_replace(md5(c1), md5(c1), '-:VmR@n9?is;6[Ty!Q{8DI^h86C')

Here are the rows that do not match:

I0118 12:01:44.539122 2941041 FuzzerToolkit.cpp:123] At 16 [ null vs MiB$?;Y"@!qT!8qQU(J<|*r]>LV@~5#)XZ6VQK{JRTy-[-fXZx ]
...
I0118 12:01:44.539484 2941041 FuzzerToolkit.cpp:123] At 87 [ MiB$?;Y"@!qT!8qQU(J<|*r]>LV@~5#)XZ6VQK{JRTy-[-fXZx vs null ]
...
I0118 12:01:44.539549 2941041 FuzzerToolkit.cpp:123] At 98 [ MiB$?;Y"@!qT!8qQU(J<|*r]>LV@~5#)XZ6VQK{JRTy-[-fXZx vs null ]

Reproduction

Reproducing is complicated by the fact that I have rebased my branch away from the commit that initially caused it. But I do have a branch on my remote that I believe you can reproduce the input vectors on here: https://github.com/codyschierbeck/velox/tree/md5withSparkExpressionRunner.

If you run the command line in "Error Reproduction", you should eventually run into the error.

I am not sure if GitHub allows it, but I can also provide a .zip or .tar of the fuzzer_repro_path directory if need be.

Once you have reproduced the error, you can use the following branch to test the solutions below: https://github.com/codyschierbeck/velox/tree/testing_bool_regexp_replace.

This branch includes the spark_expression_runner_test I created in #8341 to allow you to attach a debugger of your choice.

Solution

A simple way to solve this is to just remove the check. But, in the case of input vectors that are very large with zero unique regexes, this will lead to a map that stores each one of them. We have to introduce some way of evicting entries.

I decided to use velox/common/caching/SimpleLRUCache to accomplish this. Below is the implementation:


template <typename T>
struct RegexpReplaceFunction {
  VELOX_DEFINE_FUNCTION_TYPES(T);

  using RegexLRUCache = SimpleLRUCache<std::string, std::shared_ptr<re2::RE2>>;

  RegexpReplaceFunction() : cache_(kMaxCompiledRegexes, 1) {}

.....

  std::shared_ptr<re2::RE2> getRegex(const std::string& pattern) const {
    std::shared_ptr<re2::RE2> pReg = getFromCache_(pattern);
    if (pReg != nullptr) {
      return pReg;
    }

    // VELOX_USER_CHECK_LT(
    // regexCache_.size(),
    // kMaxCompiledRegexes,
    // "regexp_replace hit the maximum number of unique regexes: {}",
    // kMaxCompiledRegexes);

    checkForCompatiblePattern(pattern, "regexp_replace");
    std::shared_ptr<re2::RE2> patternRegex =
        std::make_shared<re2::RE2>(pattern);
    checkForBadPattern(*patternRegex.get());

    addToCache_(pattern, patternRegex);
    return patternRegex;
  }

   mutable RegexLRUCache cache_;

The solution branch above also has functions getFromMap and addToMap which you can plug in to test the map based solution. When I create a PR for the solution, I will remove the functions and just have the correct logic within the "getRegex" function.

Error Reproduction

_build/debug/velox/expression/tests/spark_expression_fuzzer_test --repro_persist_path <path_to_your_repro_dir> --duration_sec 1800 --enable_variadic_signatures --lazy_vector_generation_ratio 0.2 --velox_fuzzer_enable_column_reuse --velox_fuzzer_enable_expression_reuse --max_expression_trees_per_step 2 --retry_with_try --enable_dereference --seed 532472015 --logtostderr=1 --minloglevel=0 --assign_function_tickets regexp_replace=1000

Relevant logs

No response

mbasmanova commented 5 months ago

@codyschierbeck Hi Cody, thank you for debugging and sharing your findings. I need to find more time to read your findings carefully as it is not obvious to me what exactly is happening. Are you seeing this failure only with try? Does the error go away if you run fuzzer with --retry-with-try option disabled?

A simple way to solve this is to just remove the check.

I feel that it is important to keep this check. Compiling regular expressions is expensive. We don't want to compile new regex for every row. This will use lots of resources, make queries very slow and users unhappy. It is particularly dangerous as users won't see much trouble testing their queries at a "smaller scale".

Fuzzer is a piece of software and therefore may have bugs and limitations. A fuzzer failure may indicate a legitimate bug in production code or a bug in the Fuzzer itself. We need to figure out which one it is in this case. It might be a bug (or a limitation) in the Fuzzer itself.

Any chance you could illustrate the problem using some simple expressions and data?

CC: @bikramSingh91 @kagamiori @kgpai

codyschierbeck commented 5 months ago

@mbasmanova

I hope that the below explanation illustrates the problem regexp_replace is running into, and clarifies that outside of extreme corner cases, the solution I've provided will not compile a regex for every row. I couldn't craft an example with simple expressions and data, because the corner case itself relies on a uniquely efficient folding of input vectors, to the best of my understanding. I haven't taken a look at the outcome of other input vector foldings, but this case processes a 100 row vector with 26 calls thanks to efficient folding.

On compiling for every row

As for compiling regexes at every row, we still only compile a unique regex ideally once. Now, instead of throwing an error when we've seen more than kMaxCompiledRegexes, we evict the least recently used regex from our SimpleLRUCache and add the new regex. So, in the case where we have a query ran over 1,000,000 rows but with a constant pattern like "regexp_replace(c0, '(\d+)', 100), the regexp "(\d+)" is compiled a single time then retrieved from the cache every subsequent call.

This is how it worked beforehand. As long as the query did not send >20 unique regexes, regexp_replace could theoretically process an infinite amount of rows and only compile each regex once. I included two test cases to ensure that:

https://github.com/facebookincubator/velox/blob/fce32e4d033ef656244e011f6adcee11de2ee68a/velox/functions/sparksql/tests/RegexFunctionsTest.cpp#L532-L570

The first test made sure kMaxCompiledRegexes worked, and the second one made a vector of kMaxCompiledRegex -1, then duplicated it 50000 times to make a df of size 1,000,000 that wouldn't hit the limit. (In hindsight I will change that to *3 because if anyone ever changes kMaxCompiledRegexes to a larger number that could cause a decent delay in CI/CD for other PRs. As it currently is implemented, this test case takes 3 seconds).

Detailed explanation

The problem with the cache size check is that it is dependent not on the input values, but on the state of the query evaluation as a whole and this is because the state of the cache is also query evaluation dependant and not row dependant. I modified my RegexpReplace function to try and illustrate this. Instead of ever throwing an error, I caught all errors and made them the result. I also included function members that tracked which "callNumber" of the function this was, what the size of the cache was at the end of the call, and offset into the result vector we were being written to. Here is an example of common eval compared to simple eval:

image

You'll likely have to open the image in another window as I tried to include as much output as possible. As you can see, the value of row 1 for the common evaluation (left hand side) was calculated during the 5th call, the cache size was also 5, and its "result offset" was 21. Until the cache limit is hit, cache size always matches callNumber for common evaluation

Importantly, though, is row 16 which is where we saw the initial fuzzer failure before I changed the result output.

At 16 [ [16->79] Cache Limit:callNumber: 22, cacheSize: 20, result offset: 79 vs a:callNumber: 11, cacheSize: 7, result offset: 16 ]

The state of the query when it processes the result for row 16 differs. For common eval, it was the 22nd call to regexp_replace and the first time we had a full cache and didn't find the pattern in the cache. For simple eval, it was the 11th call (there were 5 null values before), we had leveraged the cache 4 times to this point, and the result was being directly written to the row being processed.

I believe the reason for this is a mixture of common evaluation sub-expression folding, the way it leverages dictionary encoding, and the query itself.

regexp_replace(md5(c1), md5(c1), '-:VmR@n9?is;6[Ty!Q{8DI^h86C')

Effectively, this call is just "if row x of c1 isn't null, make it -:Vmr...."

While I am still not exactly sure how common evaluation folding works, it appears incredibly efficient with this given input vector and query. The entirety of the output is not shown in the above picture, but common evaluation reduces 100 calls in this case to 27 calls. It hits its first cache miss with a full cache in its 22nd call, as seen above with row 16.

On the other hand, simple evaluation gets its first cache miss with a full cache in call 47 (row 72)

At 72 [ [72->53] a:callNumber: 13, cacheSize: 13, result offset: 53 vs Cache Limit:callNumber: 47, cacheSize: 20, result offset: 72 ]

And from then on any cache miss is a null value.

To summarize, I am confident that the problem is that we are throwing a query state-dependent exception (cache.size > 20) and then comparing the results of different evaluation methods. What I am not confident about is how common this corner case happens. I discovered this a week ago using two different seeds using regexp_replace tickets at 10000. The errors occurred about 25 minutes into a 30-minute execution. Fuzzer seeds that don't see this failure ran regexp_replace upwards of 60k times in those 30 minutes, so I would assume that it took 50k calls of regexp_replace before hitting this corner case.

Both fuzzer runs failed with the exact same query and input vectors. The first column of the input vector is not used in the offending query. The second is a 100-element VARBINARY with 3 dictionaries wrapping it.

Solutions

There may be more options for a solution, but here are some that come to mind, starting with the worst ones.

1) We could ignore it and remove regexp_replace from the spark fuzzer. I don't like this option but Spark Fuzzer already excludes regexp_extract, and this corner case is 1/50,000. But there are better options.

2) Remove the check. I can't think of a way to implement this check without it being query execution state-dependent. As long as the cache can miss while above its limit at different points in query execution, this error will persist. But if we are going to remove the check we need to avoid the Folly::F14FastMap growing in worst case to size O(n).

3) Implement LRU cache eviction. This is what the PR currently implements. When we have a cache miss at our limit, we just evict the least recently used compiled regex. This maintains our ability to efficiently run both constant and non-constant pattern parameters. With constant, we cache once and use that cached value for however many rows it's required. This emulates how every other regexp function works in Velox, without demanding that the parameter is constant. We also allow users to pass columns of regexes. Ideally, they pass it with a column sorted by pattern to guarantee that an evicted pattern isn't used again. In the case they don't the cache still will do its best job to keep the most commonly used patterns and avoid recompilation, but of course, there are still going to be edge cases where we process a large amount of regexes, sometimes multiple times. I think in this case, increasing the size of kMaxRegexsCompiled, to 100 for example, would help avoid recompilation. This would further guarentee that recompilation only occurs when the user has passed a column that already requries a lot of processing.

In the worst case, we are passed a very large column of unique regexes and have to compile each one. I'm not sure how to avoid this on Velox's end without simply checking beforehand how many unique patterns there are. I may be naive, but if its the query the user is asking for I also don't understand why we throw an exception. We could warn them after noticing how many unique regexes they've supplied, but in the end they are asking for compiling thousands of unique regexes.

I'm curious about your thoughts, and if you have any other ideas for how to get around this problem.

mbasmanova commented 5 months ago

@codyschierbeck Hi Cody, I think I understand what's going on. Let me try to explain.

Common expression evaluation is aware of encodings. If input is dictionary-encoded, common eval peels off the dictionary encoding and evaluates the expression on the base vector. If dictionary has lots of repetitions, this reduces the number of evaluations. Simple expression evaluation always flattens the inputs and evaluates expression on each row.

Common eval may process rows out of order. For example, if input is dictionary encoded using indices 10, 9, 8,....0, then evaluation will will happen in "reverse" order. The following simple test case can be used to illustrate that point. Here, we evaluate LIKE on 30 rows with unique patterns each. When evaluating on flat inputs first 20 rows are processed and remaining 10 rows fail. When we wrap that input in a dictionary with indices in reverse we observe that last 20 rows are processed while first 10 rows fail.

TEST_F(Re2FunctionsTest, xxx) {
  auto data = makeRowVector({
      makeFlatVector<std::string>(
          30, [](auto row) { return std::string(row, 'x'); }),
      makeFlatVector<std::string>(
          30,
          [](auto row) {
            return fmt::format(
                "{}%{}", std::string(row, 'x'), std::string(row, 'y'));
          }),
  });

  auto r = evaluate("try(c0 like c1)", data);
  LOG(ERROR) << r->toString();
  LOG(ERROR) << r->toString(0, 30);

  auto indices = makeIndicesInReverse(30);
  auto d2 = makeRowVector({
      wrapInDictionary(indices, data->childAt(0)),
      wrapInDictionary(indices, data->childAt(1)),
  });

  r = evaluate("try(c0 like c1)", d2);
  LOG(ERROR) << r->toString();
  LOG(ERROR) << r->toString(0, 30);

  FAIL();
}

Thus, comparing common and simplified eval is not valid for functions whose results depend on the order of inputs.

The Fuzzer is simply not sophisticated enough to test regexp_xxx functions which have a limit on number of unique regular expressions which makes them dependent on the order of inputs.

It seems the only viable solution would be to skip these functions in the Fuzzer.

In the worst case, we are passed a very large column of unique regexes and have to compile each one. I'm not sure how to avoid this on Velox's end without simply checking beforehand how many unique patterns there are. I may be naive, but if its the query the user is asking for I also don't understand why we throw an exception. We could warn them after noticing how many unique regexes they've supplied, but in the end they are asking for compiling thousands of unique regexes.

Usually, queries are processed on shared clusters and therefore it is important to ensure that no single query uses outsized amount of resources. Also, often queries are processed without a human in attendance, hence, there is no-one to warm about a problematic query. Unless a query fails, the author won't be interested in knowing whether the query is not written correctly. Furthermore, the query may originally be fine and process only a handful of unique regexes, but over time the data may change or a bug may be introduced that causes the number of unique regexes to explode. In this case, failing early and loudly will protect a shared resource and save operating team a lot of time investigating major outage as well as protect users of well-written queries.

Hope this makes sense.

mbasmanova commented 5 months ago

Here is the output of the simple test above:

E0123 17:55:48.550230 1374848 Re2FunctionsTest.cpp:1171] [FLAT BOOLEAN: 30 elements, 9 nulls]
E0123 17:55:48.550406 1374848 Re2FunctionsTest.cpp:1172] 0: true
1: false
2: false
3: false
4: false
5: false
6: false
7: false
8: false
9: false
10: false
11: false
12: false
13: false
14: false
15: false
16: false
17: false
18: false
19: false
20: false
21: null
22: null
23: null
24: null
25: null
26: null
27: null
28: null
29: null
E0123 17:55:48.559844 1374848 Re2FunctionsTest.cpp:1181] [DICTIONARY BOOLEAN: 30 elements, no nulls]
E0123 17:55:48.559906 1374848 Re2FunctionsTest.cpp:1182] 0: [0->29] null
1: [1->28] null
2: [2->27] null
3: [3->26] null
4: [4->25] null
5: [5->24] null
6: [6->23] null
7: [7->22] null
8: [8->21] null
9: [9->20] false
10: [10->19] false
11: [11->18] false
12: [12->17] false
13: [13->16] false
14: [14->15] false
15: [15->14] false
16: [16->13] false
17: [17->12] false
18: [18->11] false
19: [19->10] false
20: [20->9] false
21: [21->8] false
22: [22->7] false
23: [23->6] false
24: [24->5] false
25: [25->4] false
26: [26->3] false
27: [27->2] false
28: [28->1] false
29: [29->0] true
kgpai commented 5 months ago

Hi @codyschierbeck ,

Firstly thank you for the efforts you put in trying to bring this issue to light. Just as an fyi, Expression fuzzers can persist their vectors, plus expression, so you can easily share the repro around . We have documentation here : https://facebookincubator.github.io/velox/develop/testing/fuzzer.html#accurate-on-disk-reproduction - Please try it out and it should help you out a lot more in the future. Secondly please feel free to reach out to me on slack if you need clarifications if you think you see unusual behavior.

mbasmanova commented 5 months ago

One option is to make a limit configurable and set it to some high value in the Fuzzer. However, Fuzzer's coverage for json and regex functions is quite limited. Fuzzer generates random data that has virtually no chance of being valid JSON or regular expression, hence, test coverage doesn't go beyond some sanity checks. It would be great to extend ExpressionFuzzer to allow custom input generators similar to the ones available in AggregationFuzzer.

codyschierbeck commented 5 months ago

@mbasmanova

Thank you for your explanation. That makes a lot of sense. I am returning regexp_replace to use a F14FastMap, the kMaxRegexCompiled check, and then adding regexp_replace to the spark fuzzer skip list. Will have those updates in the PR later today.

My current priorities are getting regexp_replace merged, then string decode/encode and parse url, but once I get those PRs up and healthy I'd love to take a look at how aggregate fuzzer implements custom input and see if we can figure something out for regexp_functions.

As for changing kMaxRegexesCompiled to a large number, I believe just matching


DEFINE_int32(
    batch_size,
    100,
    "The number of elements on each generated vector.");

May do the trick?

mbasmanova commented 5 months ago

May do the trick?

It may... or may not.

Consider an expression with a lambda function: transform(array, x -> f(x)). If we evaluate this expression on 100 rows with 10-element array in each row, we'll need to evaluate f(x) on 10 * 100 = 1000 values.