yugabyte / yugabyte-db

YugabyteDB - the cloud native distributed SQL database for mission-critical applications.
https://www.yugabyte.com
Other
8.9k stars 1.06k forks source link

[CQL] Duplicate rows are returned when the query specifies token ranges #8977

Open tedyu opened 3 years ago

tedyu commented 3 years ago

I was debugging TestJsonUpsert (involving Spark connector) where the table is defined as:

    String createTable = "CREATE TABLE " + tableWithKeysapce + "(id int PRIMARY KEY," + "name text,"
        + "address text," + "phone jsonb" + ");";

Here is the query:

      String selectQuery = "Select id,name,address,phone from mycatalog." + tableWithKeysapce
          + " where id in (2,3) order by id";

There are 9 partitions for the table. Spark connector would send read requests to the partitions, specifying token range. What I observed is that I get one row for id=2. However, for id=3, there are still 9 rows coming back (with same content).

There are 18 lines of log with 'Fetching data for range'.

Below is log snippet beginning with log from Spark connector, including the additional log from eval_where.cc This is the 10th occurrence among the 18 (which should be the first for id=3) I looked at data fetching log lines before and after this, the partition range is present.

2021-06-18 09:07:40,304 (Executor task launch worker for task 3) [DEBUG - com.datastax.spark.connector.util.Logging.logDebug(Logging.scala:39)] Fetching data for range (token("id") >= ? AND token("id") < ?,List(3073143795726942208, 5122563101157228544)) with SELECT "id", "address", "name", "phone" FROM test.person WHERE token("id") >= ? AND token("id") < ? AND "id" IN (?, ?) ALLOW FILTERING with params [3073143795726942208,5122563101157228544,2,3] 3073143795726942208 5122563101157228544
2021-06-18 09:07:40,305 (Executor task launch worker for task 3) [DEBUG - com.yugabyte.oss.driver.internal.core.loadbalancing.PartitionAwarePolicy.getQueryPlan(PartitionAwarePolicy.java:113)] getQueryPlan: keyspace = test, query = SELECT "id", "address", "name", "phone" FROM test.person WHERE token("id") >= ? AND token("id") < ? AND "id" IN (?, ?) ALLOW FILTERING
2021-06-18 09:07:40,305 (Executor task launch worker for task 3) [DEBUG - com.yugabyte.oss.driver.api.core.TableSplitMetadata.getPartitionMetadata(TableSplitMetadata.java:56)] key 49348 -> partition = [43686, 50967) -> [Node(endPoint=127.152.163.8/127.152.163.8:9042, hostId=0eac05a7-8aad-3e89-f144-260bc0694527, hashCode=2829004c), Node(endPoint=127.69.210.86/127.69.210.86:9042, hostId=7f67e561-5f92-f6b0-5148-8f4482d879e4, hashCode=30d7fed), Node(endPoint=127.43.227.58/127.43.227.58:9042, hostId=84469313-14c4-6d88-304b-f855926d61cc, hashCode=40166fd3)]
2021-06-18 09:07:40,305 (Executor task launch worker for task 3) [DEBUG - com.yugabyte.oss.driver.internal.core.loadbalancing.PartitionAwarePolicy.newQueryPlan(PartitionAwarePolicy.java:88)] newQueryPlan: Number of Nodes = 3
ts2|pid2213859|:29642|http://127.43.227.58:17002 E0618 09:07:40.306763 2213991 audit_logger.cc:537] AUDIT: user:anonymous|host:127.43.227.58:9042|source:127.0.0.1|port:55046|timestamp:1624007260306|type:SELECT|category:QUERY|ks:test|scope:person|operation:SELECT "id", "address", "name", "phone" FROM test.person WHERE token("id") >= ? AND token("id") < ? AND "id" IN (?, ?) ALLOW FILTERING
ts2|pid2213859|:29642|http://127.43.227.58:17002 I0618 09:07:40.306836 2213991 eval_where.cc:96] where hash code 43686, 3073143795726942208, op=>=
ts2|pid2213859|:29642|http://127.43.227.58:17002 I0618 09:07:40.306849 2213991 eval_where.cc:96] where hash code 50967, 5122563101157228544, op=<
ts2|pid2213859|:29642|http://127.43.227.58:17002 I0618 09:07:40.306892 2213991 eval_where.cc:198] part option 10, int32_value: 2
ts2|pid2213859|:29642|http://127.43.227.58:17002 I0618 09:07:40.306903 2213991 eval_where.cc:198] part option 10, int32_value: 3
tedyu commented 3 years ago

I included a third id value for the IN clause. Here is snippet with the end of second value and beginning of 3rd value:

2021-06-19 21:45:18,594 (Time-limited test) [INFO - org.yb.loadtest.TestJsonUpsert.existingKeysUpdate(TestJsonUpsert.java:259)] json str 8:[3,ABC,USA,{"Module":"FM","code":"55","phone":"1400"}]
2021-06-19 21:45:18,594 (Time-limited test) [INFO - org.yb.loadtest.TestJsonUpsert.existingKeysUpdate(TestJsonUpsert.java:259)] json str 9:[3,ABC,USA,{"Module":"FM","code":"55","phone":"1400"}]
2021-06-19 21:45:18,594 (Time-limited test) [INFO - org.yb.loadtest.TestJsonUpsert.existingKeysUpdate(TestJsonUpsert.java:259)] json str 10:[5,SAM,INDIA,{"call":"75675655","code":"91"}]
2021-06-19 21:45:18,594 (Time-limited test) [INFO - org.yb.loadtest.TestJsonUpsert.existingKeysUpdate(TestJsonUpsert.java:259)] json str 11:[5,SAM,INDIA,{"call":"75675655","code":"91"}]

Meaning, only the first value has one row returned, subsequent values have duplicate rows returned.

tedyu commented 3 years ago

When there is IN predicate, Executor::WhereClauseToPB() would turn on is_multi_partition (due to multiple values for IN predicate).

Executor::ExecPTNode() in turn would handle unread partitions. At the end of AdvanceToNextPartition():

  req->clear_hash_code();
  req->clear_max_hash_code();

so when handling subsequent partitions, there is no hash code / max hash code set (no corresponding token range).

This can explain why the token range specified by SELECT statement is not effective.