apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

/v2 and /v2/candidates endpoints not respecting broker partition pruning - range partition #16222

Open ColeAtCharter opened 5 months ago

ColeAtCharter commented 5 months ago

This is to report unexpected behavior where the broker selects too many segments to query. The identified scenario is when secondary partition information exists (eg, for range partitioning) but is not being used in conjunction with a query filter which corresponds to the secondary partitions. The result of not completely pruning segments at the broker is a detrimental impact on system operations such as for performance and cost.

Affected Version

28

Description

The /druid/v2 and /druid/v2/candidates broker endpoints are returning segments which should be filtered out based on secondary partition metadata. Returning unneeded segments for planning and executing queries will cause unnecessary I/O throughout the system, causing avoidable detriment to cost and performance.

Steps to reproduce:
  1. create segments with range partitioning, mark as used, load onto historicals. Testing will require multiple segments per combination of datasource and time chunk.
  2. Identify the partition column value(s) corresponding to a single segment (within a datasource/time chunk)
  3. submit /v2 and /v2/candidates requests to the broker for the datasource and time chunk identified. Add a query filter using a partition value previously identified for the first partition column.
Expected/observed behavior summary

Testing notes

Representation of the test setup

segments for "src1"
server segments for datasource "src1"
segment metadata - range partition column values
test query
Expected result example - /v2/candidates
[
  {
    "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
    "version": "2024-04-01T23:59:59.999Z",
    "partitionNumber": 1,
    "size": 999999,
    "locations": [
      {
        "name": "host1:8283",
        "host": null,
        "hostAndTlsPort": "host1:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      },
      {
        "name": "host3:8283",
        "host": null,
        "hostAndTlsPort": "host3:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "tier2",
        "priority": 0
      }
    ]
  }
]

If partition pruning is reflected at the broker, it would be unexpected to receive back any of "the other two segments" (regardless of server/replication)

Unexpected result example 1: /druid/v2 endpoint - extra (but not all) segments are returned
[
    {
        "timestamp": "2024-04-01T09:00:00.000Z",
        "result": {
            "results": [
                {
                    "segmentId": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1",
                    "columns": ["__time", "col_1", "col_15"],
                    "events": [
                        {"__time": 1739667600000, "col_1": "value1jkl", "col_15": "hello"},
                        {"__time": 1739668980000, "col_1": "value1jkl", "col_15": "world"},
                        {"__time": 1739668980000, "col_1": "value1jkl", "col_15": "hello"},
                        {"__time": 1739670360000, "col_1": "value1jkl", "col_15": "world"}
                    ],
                    "rowSignature": [
                        {"name": "__time", "type": "LONG"},
                        {"name": "col_1", "type": "STRING"},
                        {"name": "col_15", "type": "STRING"}
                    ]
                }
            ],
            "segment": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1",
            "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z"
        }
    },
    {
        "timestamp": "2024-04-01T09:00:00.000Z",
        "result": {
            "results": [],
            "segment": "src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z",
            "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z"
        }
    }
]
Unexpected result example 2: /druid/v2/candidates endpoint - all segments are returned
[
  {
    "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
    "version": "2024-04-01T23:59:59.999Z",
    "partitionNumber": 0,
    "size": 999999,
    "locations": [
      {
        "name": "host1:8283",
        "host": null,
        "hostAndTlsPort": "host1:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      },
      {
        "name": "host2:8283",
        "host": null,
        "hostAndTlsPort": "host2:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "tier2",
        "priority": 0
      }
    ]
  },
  {
    "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
    "version": "2024-04-01T23:59:59.999Z",
    "partitionNumber": 1,
    "size": 999999,
    "locations": [
      {
        "name": "host1:8283",
        "host": null,
        "hostAndTlsPort": "host1:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      },
      {
        "name": "host3:8283",
        "host": null,
        "hostAndTlsPort": "host3:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "tier2",
        "priority": 0
      }
    ]
  },
  {
    "interval": "2025-02-16T01:00:00.000Z/2025-02-16T02:00:00.000Z",
    "version": "2024-04-01T23:59:59.999Z",
    "partitionNumber": 2,
    "size": 999999,
    "locations": [
      {
        "name": "host2:8283",
        "host": null,
        "hostAndTlsPort": "host2:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "_default_tier",
        "priority": 0
      },
      {
        "name": "host3:8283",
        "host": null,
        "hostAndTlsPort": "host3:8283",
        "maxSize": 999999999999,
        "type": "historical",
        "tier": "tier2",
        "priority": 0
      }
    ]
  }
]
Summary

The broker should only select and/or query the smallest number of segments matching on datasource/time chunk and partition dimensions when it has enough has enough partition information about the used/loaded segments to prune out segments that don't match on partition dimensions (in addition to those segments not matching on datasource or time chunk)

Related documentation
gianm commented 3 months ago

Hi @ColeAtCharter. The /druid/v2/candidates API does not currently take into account segment pruning, although /druid/v2/ and /druid/v2/sql/ (the actual query APIs) do. I tried to repro the issue on /druid/v2/ and was not able to. A single segment was queried, as expected.

Could you provide some more details that could be used to repro this issue?

Here's what I tried:

I loaded data with the following SQL and rowsPerSegment: 1500 (to ensure we get multiple segments):

REPLACE INTO "wikipedia" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
      '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz"]}',
      '{"type":"json"}'
    )
  ) EXTEND ("isRobot" VARCHAR, "channel" VARCHAR, "timestamp" VARCHAR, "flags" VARCHAR, "isUnpatrolled" VARCHAR, "page" VARCHAR, "diffUrl" VARCHAR, "added" BIGINT, "comment" VARCHAR, "commentLength" BIGINT, "isNew" VARCHAR, "isMinor" VARCHAR, "delta" BIGINT, "isAnonymous" VARCHAR, "user" VARCHAR, "deltaBucket" BIGINT, "deleted" BIGINT, "namespace" VARCHAR, "cityName" VARCHAR, "countryName" VARCHAR, "regionIsoCode" VARCHAR, "metroCode" BIGINT, "countryIsoCode" VARCHAR, "regionName" VARCHAR)
)
SELECT
  TIME_PARSE("timestamp") AS "__time",
  "isRobot",
  "channel",
  "flags",
  "isUnpatrolled",
  "page",
  "diffUrl",
  "added",
  "comment",
  "commentLength",
  "isNew",
  "isMinor",
  "delta",
  "isAnonymous",
  "user",
  "deltaBucket",
  "deleted",
  "namespace",
  "cityName",
  "countryName",
  "regionIsoCode",
  "metroCode",
  "countryIsoCode",
  "regionName"
FROM "ext"
PARTITIONED BY DAY
CLUSTERED BY "page", "user"

Then ran this query:

{
  "queryType": "timeseries",
  "dataSource": {
    "type": "table",
    "name": "wikipedia"
  },
  "intervals": {
    "type": "intervals",
    "intervals": [
      "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
    ]
  },
  "filter": {
    "type": "selector",
    "dimension": "page",
    "value": "Bailando 2015"
  },
  "granularity": {
    "type": "all"
  },
  "aggregations": [
    {
      "type": "count",
      "name": "a0"
    }
  ],
  "context": {
    "bySegment": true
  }
}

A single segment came back in the reply:

[
  {
    "timestamp": "2016-06-27T00:00:00.000Z",
    "result": {
      "results": [
        {
          "timestamp": "2016-06-27T00:00:34.959Z",
          "result": {
            "a0": 9
          }
        }
      ],
      "segment": "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-06-04T17:02:47.206Z_1",
      "interval": "2016-06-27T00:00:00.000Z/2016-06-28T00:00:00.000Z"
    }
  }
]

I also confirmed using the metric query/segment/time that only a single segment was queried.

ColeAtCharter commented 3 months ago

@gianm -- thanks for looking at this. I do think "Unexpected result example 1: /druid/v2 endpoint" should never happen if broker partition pruning is working. Druid returned a segmentId that should have been pruned at the broker. That should not happen. The one hypothetical "exception" is if the broker is combining the data query results with its own metadata about segments (before pruning) and including a dummy data result. However, this is a bit of a stretch.

Our team can rerun the test and see if there are more metrics to elucidate the extent to which the data queries are fanned out even with partition pruning enabled

gianm commented 3 weeks ago

@ColeAtCharter I was working on something recently that reminded me of this report. I realized it's possible to see the "unexpected result example 1" if the filter is filtering on something that is on the border of two segments (i.e. the value is the end of one segment and also the start of the next). This particular behavior is intentional, because it's possible for a value on the border of two segments to appear in both segments.

In the wikipedia example I posted, when I filtered on page Bailando 2015 I only got a single segment in the reply. That particular page was not the border of two segments. When I filtered on another page that was on the border of two segments, I got two segments in the reply. One had a count of 1 and the other had a count of 0.

It is certainly possible this is what you're seeing. If so, I would expect it to be fairly uncommon, unless your partitioning key is low cardinality relative to the number of segments.

ColeAtCharter commented 2 weeks ago

Thanks, @gianm! Understood. The test example specifically contemplates this scenario by applying an equality filter that is on the first partition dimension where the value is strictly in between the target segment's start and end value for that dimension:

query filter:

{
    "type": "selector",
    "dimension": "col_1",
    "value": "value1jkl"
  }

target segment metadata:

src1_2025-02-16T01:00:00.000Z_2025-02-16T02:00:00.000Z_2024-04-01T23:59:59.999Z_1: partitionDimensions: ["col_1","col_2"], start: ["value1ghi","value2jkl"], end: ["value1lmn", "value2mnop"]

So the segment's col_1 start (value1ghi) should be strictly less than the query's col_1 equality filter value (value1jkl) and further, strictly less than the segment's col_1 end value (value1lmn).

We can look for additional examples of this behavior to get more information about reproducing the issue.