apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

GroupBy and Timeseries queries to realtime segments mis-apply filters when Schema Auto-Discovery is enabled #15191

Open funguy-tech opened 10 months ago

funguy-tech commented 10 months ago

Affected Version

V27.0.0

Impact

This issue appears to be reliably reproduced by executing a single-dimension, single-filter Native Druid query on any string dimension in a kinesis ingestion task that is derived from a Schema Auto-Discovery spec, as long as the data has not been handed off. The issue resolves after hand-off to Historicals.

Expected Result

GroupBy and Timeseries Queries against actively ingested single dimension values are consistently filtered without regard to data residency (realtime vs fully persisted segment).

Actual Result

GroupBy and Timeseries Queries against actively ingested single dimension values temporarily ignore or mis-apply filters until data segments are persisted, at which point filters are correctly applied.

Description

My team operates multiple large-scale Druid clusters with roughly identical base configurations. Pertinent details are as follows:

As part of Schema Auto-discovery migration, we migrated one of our regions to a new schema in which we only define a few legacy lists (to retain them as MVDs) and aggregations - the rest of our fields are ingested via discovery. In total, we produce records with ~100-150 fields, and the dataTypes do appear to align correctly post-migration.

In the process of migrating, we stumbled across a perplexing issue with GroupBy and Timeseries queries. Whenever we perform a single dimension query that overlaps/involves data on the Middle Managers (in our case, queries that touch the most recent 3 hours), the results received are nonsensical - the filter appears to be either inconsistently applied or not applied at all, resulting in other dimension values 'leaking' into the results despite being ruled out by the filter. This behavior is almost reminiscient of some sort of MVD edge case, but the fields experiencing this issue are strictly singular string values (and, as mentioned further down, the behavior changes between different points of the segment's lifecycle - indicating some sort of discrepancy based on query path / segment state).

Consider the following minimally-reproducible query, a GroupBy that groups and filters by an example_field dimension.

{
  "queryType": "groupBy",
  "dataSource": "Example_Records",
  "granularity": "all",
  "filter": {
      "type": "selector",
      "dimension": "example_field",
      "value": "expected_value"
   },
  "dimensions": ["example_field"],
  "intervals": [
    "2023-10-17T00:00:00+0000/2023-10-17T20:55:00+0000"
  ]
}

Assuming example_field is guaranteed to be a simple string value, this query should return at a maximum 1 row - the value expected_value. However, that is not what happens.

Oddly enough, a modification to the original query appears to fix it. If an additional dimension - even one that doesn't exist - is added to the query (ordering does not matter), it returns the expected result 100% of the time:

{
  "queryType": "groupBy",
  "dataSource": "Sample_Sessions",
  "granularity": "all",
  "filter": {
      "type": "selector",
      "dimension": "example_field",
      "value": "expected_value"
   },
  "dimensions": ["example_field", "oof"],
  "intervals": [
    "2023-10-17T00:00:00+0000/2023-10-17T20:55:00+0000"
  ]
}

The above query will always return one row with an example_field value of expected_value and an oof value of null, somehow avoiding the nonsensical condition of the first query.

abhishekagarwal87 commented 10 months ago

Thank you for the detailed report, @funguy-tech. This only happens with the dimensions that are auto-discovered, correct?

clintropolis commented 10 months ago

any chance you can provide some sample data and ingestion spec to repro the issue? If not, no big deal and I'll try my best to recreate the conditions from the description

funguy-tech commented 10 months ago

@abhishekagarwal87 From my findings, it appears to be restricted to auto-discovered dimensions, since the issue goes away when reverting to an explicitly defined schema. When introducing the auto-discovery schema in our services, all existing string fields were removed from explicit definition, so I have not actually yet tested the case of an explicitly defined string field within an auto-discovery enabled spec.

@clintropolis I intend to get a tested plug-and-play spec + data posted in the next 48h, but wanted to get the initial write-up while the incident data was still readily available. Unfortunately the original systems are confidential, so I'm having to do some parallel construction to have a shareable artifact.

funguy-tech commented 10 months ago

@clintropolis

Alright, I lied about the 48h turnaround. Here's something a bit more plug-and-play. I have a Python Lambda set up that generates pseudorandom data and publishes to a Kinesis stream to emulate a Kinesis ingestion setup.

The generated data has the following schema:

{
  "firstName": random.choice(common_first_names),
  "lastName": random.choice(common_last_names),
  "favoriteBrand": random.choice(common_brands),
  "age": random.randint(25, 70),
  "hairColor": random.choice(hair_colors),
  "eyeColor": random.choice(eye_colors),
  "streamId": str(uuid.uuid4()), # This gives us a 'random' id for each record
  "streamTime": get_record_time() # This is a random time between now and 15 minutes ago (truncated to minute)
}
  1. Create new Kinesis stream (in my case, named "generic-data-test-stream")
    • I made this stream with 10 provisioned shards, but this is likely overkill
  2. Create a Python Lambda in the same region
    • Replace the default code with the code in dummy_data_generator.txt
    • Populate stream_name Environment Variable with the stream name (name only, not ARN)
      • "generic-data-test-stream" if unchanged
    • Populate records_to_generate Environment Variable with number of records to generate per execution
      • 500 is a good starting point
    • Ensure Lambda has permission to write to Kinesis stream
      • Lazy route for test accounts: Managed Policy is AmazonKinesisFullAccess
    • Add an EventBridge trigger with rate(1 minute)
      • Not explicitly required, but gives it a steady stream of data
  3. Add the ingestion spec provided in dummy_ingestion_spec.txt
    • Swap "ioConfig.stream" and "ioConfig.endpoint" as needed for your stream name and AWS region
    • This assumes the cluster already has the proper AWS wiring in place, permissions for kinesis, etc

With this data ingestion setup, I am able to trivially reproduce the problem with the following query (make sure to adjust to cover realtime segments when executed)

{
  "queryType": "groupBy",
  "dataSource": "generic-data-test-stream",
  "granularity": "all",
  "filter": {
    "type": "selector",
    "dimension": "firstName",
    "value": "Amy"
  },
  "dimensions": ["firstName"],
  "intervals": [
    "2023-10-19T00:00:00+0000/2023-10-21T00:00:00+0000"
  ]
}

Instead of the expected response of one single value:

firstName
Amy

I am instead returned a response of multiple firstName values:

firstName
Amy
Charles
Logan

If this query is executed later on only finalized segments, the result will be the expected value (only Amy).

clintropolis commented 10 months ago

awesome, thanks, I'll have a look soon to try to get to the bottom of what is going on here