apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

thetaSketch(with sketches-core-0.13.1) in groupBy always return value no more than 16384 #7607

Closed pzhdfy closed 5 years ago

pzhdfy commented 5 years ago

Affected Version

The Druid version with sketches-core-0.13.1

Description

we updated to sketches-core-0.13.1 , because it Bug fix for Quantiles Sketches in direct mode. then we found using thetaSketch in groupBy always return value no more than 16384(the size). if set size to another value, such as 32768, the thetaSketch return <= 32768. But thetaSketch in topN and timeseries return expected data

Then we roll back to sketches-core-0.12.0, thetaSketch works well. But the Quantiles Sketches will has bug

@AlexanderSaydakov

AlexanderSaydakov commented 5 years ago

Could you be more specific please? What exactly do you do? What is the Druid version? How did you update sketches-core?

leerho commented 5 years ago

@pzhdfy This is very strange. Can you reproduce this in a smaller environment? We already have unit tests where N >> K and that show no problem. We need much more detail in order to help you troubleshoot this.

leerho commented 5 years ago

@pzhdfy

Just to double check I have run characterization tests against the Theta Union sketch (0.13.1) using both datum updates as well as sketch updates, both on-heap and off-heap.

These tests exercise a 4K sketch up to a million uniques and all 4 test suites produce the same identical accuracy pitch-fork plot as follows:

ThetaUnionAccuracy1

The plot looks a little noisy because I am only running 1024 trials at each point. This is an absolutely normal accuracy characterization plot and well within accuracy specifications.

Unless you can provide more clarity and information as to how you are using the sketch there is nothing else we can do.

pzhdfy commented 5 years ago

@AlexanderSaydakov

1.use the newest Druid version( compiled from master branch, with sketches-core-0.13.1)

2.use this python generate test data

import json
f=open('test_theta_data','w')
d={}
for i in xrange(100000):
  #date_id,user_id
  d['date_id']='20190407'
  d['user_id']=i
  f.write('%s\n' % json.dumps(d))

3.use this Ingestion spec

{
  "type": "index_hadoop",
  "spec": {
    "dataSchema": {
      "dataSource": "test_theta",
      "parser": {
        "type": "hadoopyString",
        "parseSpec": {
          "format": "json",
          "timestampSpec": {
            "column": "date_id",
            "format": "yyyyMMdd",
            "missingValue": 0
          },
          "dimensionsSpec": {
            "dimensionExclusions": [],
            "dimensions": [
              "date_id",
              "user_id"
            ]
          }
        }
      },
      "metricsSpec": [],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "DAY",
        "rollup": true,
        "intervals": [
          "2019-04-07/2019-04-08"
        ]
      }
    },
    "ioConfig": {
      "type": "hadoop",
      "inputSpec": {
        "children": [
          {
            "paths": "/home/druid/druid_test_histogram/test_theta_data",
            "type": "static"
          }
        ],
        "type": "multi"
      }
    },
    "tuningConfig": {
      "type": "hadoop",
      "jobProperties": {
        "mapreduce.am.push.alter.metrics": "druid_indexer_config#druid.dataSource.name#druid.hive.table#mapreduce.input.multipleinputs.dir.formats:druidIndexerConfig#druidDataSource#druidHiveTable#multipleinputsDir",
        "mapreduce.job.classloader": "true",
        "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.",
        "mapreduce.job.queuename": "root.offline.hdp.hdp.L_0",
        "mapreduce.map.java.opts": "-Xmx6G -Djava.net.preferIPv4Stack=true",
        "mapreduce.map.memory.mb": "10240",
        "mapreduce.reduce.java.opts": "-Xmx6G -Djava.net.preferIPv4Stack=true",
        "mapreduce.reduce.memory.mb": "10240",
        "mapreduce.reduce.speculative": "false"
      }
    }
  },
  "context": {
    "druid.indexer.fork.property.druid.processing.buffer.sizeBytes": 1024,
    "druid.indexer.fork.property.druid.processing.numMergeBuffers": 1,
    "druid.indexer.fork.property.druid.processing.numThreads": 1
  }
}

4.use groupby and topN

{
  "dimensions": [
    "date_id"
  ],
  "aggregations": [
    {
      "type": "thetaSketch",
      "fieldName": "user_id",
      "name": "count_distinct__user_id"
    }
  ],
  "intervals": "2019-04-07/2019-04-08",
  "limitSpec": {
    "limit": 5000,
    "type": "default",
    "columns": [
      {
        "direction": "descending",
        "dimension": "count_distinct__user_id"
      }
    ]
  },
  "granularity": "all",
  "queryType": "groupBy",
  "dataSource": "test_theta"
}
{
  "dimension": "date_id", 
  "aggregations": [
    {
      "type": "thetaSketch", 
      "fieldName": "user_id", 
      "name": "count_distinct__user_id"
    }
  ], 
  "intervals": "2019-04-07/2019-04-08", 
 "metric": "count_distinct__user_id", 
 "threshold": 5000, 
  "granularity": "all",  
  "queryType": "topN", 
  "dataSource": "test_theta"
}

the result for groupby and topN image image

gianm commented 5 years ago

In groupBy vs topN, as far as aggregators are concerned, one major difference is that groupBy uses relocate and topN does not. However, since you have just one date_id, I don't think it's likely that relocate would be called. So that's probably not it.

leerho commented 5 years ago

@pzhdfy 1) Please try sketches release 0.13.0. This will narrow down the possible changes that might be causing this. 2) At the points where the sketch is reporting the good result and the bad result print out the sketch summary using toString(). This might provide some clues.

gianm commented 5 years ago

I was able to reproduce this as well. Downgrading to sketches-core-0.13.0 fixed the problem. I also noticed that adding a limit to the groupBy fixed it as well. I'm not sure why - it does change the code paths, however. In Druid SQL, this query exhibits the issue:

SELECT 'beep', APPROX_COUNT_DISTINCT_DS_THETA("user_id") FROM test_theta GROUP BY 1

And this one doesn't:

SELECT 'beep', APPROX_COUNT_DISTINCT_DS_THETA("user_id") FROM test_theta GROUP BY 1 LIMIT 1

(The 'beep' is to force the SQL planner to use a groupBy rather than timeseries query type.)

gianm commented 5 years ago

Thank you, @pzhdfy, for the detailed instructions on how to reproduce this problem.

leerho commented 5 years ago

Very puzzling. Se need to simplify the problem environment to where I can reproduce the problem outside Druid. I suspect that somehow theta is being reset to 1.0, which would cause this.

gianm commented 5 years ago

@leerho, please let me know if the following is helpful, or if I could do anything else to help.

What the Druid query is doing is something like this:

  1. Iterating over all rows in a Druid segment, and building up a theta sketch object. This object looks fine.
  2. Taking that object and merging it into the 'merge buffer', which starts off initialized to an empty sketch. This is where it goes off the rails.

I scattered a bunch of sketch toStrings around the code and found that in step (2) they look like this:

The object built up from the segment scan,

### HeapCompactOrderedSketch SUMMARY:
   Estimate                : 100086.81001356241
   Upper Bound, 95% conf   : 101530.78009013624
   Lower Bound, 95% conf   : 98663.31883662633
   Theta (double)          : 0.16369789383615946
   Theta (long)            : 1509846576500454824
   Theta (long) hex        : 14f40d7639a635a8
   EstMode?                : true
   Empty?                  : false
   Array Size Entries      : 16384
   Retained Entries        : 16384
   Seed Hash               : 93cc | 37836
### END SKETCH SUMMARY

The initial state of the sketch in the merge buffer (should be empty),

### HeapCompactOrderedSketch SUMMARY:
   Estimate                : 0.0
   Upper Bound, 95% conf   : 0.0
   Lower Bound, 95% conf   : 0.0
   Theta (double)          : 1.0
   Theta (long)            : 9223372036854775807
   Theta (long) hex        : 7fffffffffffffff
   EstMode?                : false
   Empty?                  : true
   Array Size Entries      : 0
   Retained Entries        : 0
   Seed Hash               : 93cc | 37836
### END SKETCH SUMMARY

The final state of the sketch in the merge buffer (should match the original sketch from the segment scan),

### HeapCompactOrderedSketch SUMMARY:
   Estimate                : 16384.0
   Upper Bound, 95% conf   : 16384.0
   Lower Bound, 95% conf   : 16384.0
   Theta (double)          : 1.0
   Theta (long)            : 9223372036854775807
   Theta (long) hex        : 7fffffffffffffff
   EstMode?                : false
   Empty?                  : false
   Array Size Entries      : 16384
   Retained Entries        : 16384
   Seed Hash               : 93cc | 37836
### END SKETCH SUMMARY

It's changed a bit, but doesn't match up.

The code that printed this was the aggregate method in SketchBufferAggregator, which looks like this after the debugging code I added:

  @Override
  public void aggregate(ByteBuffer buf, int position)
  {
    Object update = selector.getObject();
    if (update == null) {
      return;
    }

    Union union = getOrCreateUnion(buf, position);
    final String initialUnionResult = update instanceof SketchHolder ? union.getResult().toString() : null;

    SketchAggregator.updateUnion(union, update);

    if (update instanceof SketchHolder) {
      log.info(
          "Aggregate called with buffer[%s], position[%s], update = %s, union starts as = %s, union ends as = %s",
          System.identityHashCode(buf),
          position,
          ((SketchHolder) update).getSketch(),
          initialUnionResult,
          union.getResult()
      );
    }
  }
leerho commented 5 years ago

Thank you!! We have been able to reproduce the problem. Now I can dig in to see what went wrong.

AlexanderSaydakov commented 5 years ago

This is a regression in Theta sketch code. So I would think you don't want to approve the 0.14.1 release candidate as it is now. We will fix the sketches-core shortly.

leerho commented 5 years ago

@pzhdfy @gianm DataSketches sketches-core 0.13.3 is now released to Maven Central with the fix. Thank you @pzhdfy and @gianm for your help in finding this!!

AlexanderSaydakov commented 5 years ago

pull request: https://github.com/apache/incubator-druid/pull/7619

clintropolis commented 5 years ago

This is a regression in Theta sketch code. So I would think you don't want to approve the 0.14.1 release candidate as it is now. We will fix the sketches-core shortly.

0.14.1 is too far gone, the artifacts are already propagated to maven and the apache mirrors, so I'm going to go ahead and do the release anyway. I've modified the release notes to warn about upgrading if relying on theta sketches.

This issue does seem severe enough to go ahead and do a 0.14.2 since we can probably drive that through a lot quicker than we can wrap up and validate 0.15.0, so I will create an rc and start a vote as soon as possible.

clintropolis commented 5 years ago

Fixed by #7619