apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.44k stars 3.69k forks source link

hadoop-based batch ingestion get “java.lang.NullPointerException” error #5216

Closed Llinjing closed 5 years ago

Llinjing commented 6 years ago

error message:

Exception running child : io.druid.java.util.common.RE: Failure on row[3b81dc53ba66\x10,,\xe0\xcf\xe97c3b7e4b519103 123 abc false   2018-01-02 01:57:44]
    at io.druid.indexer.HadoopDruidIndexerMapper.map(HadoopDruidIndexerMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:155)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:805)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.NullPointerException
    at io.druid.query.aggregation.datasketches.theta.SketchAggregator.updateUnion(SketchAggregator.java:107)
    at io.druid.query.aggregation.datasketches.theta.SketchAggregator.aggregate(SketchAggregator.java:51)
    at io.druid.indexer.InputRowSerde.toBytes(InputRowSerde.java:98)
    at io.druid.indexer.IndexGeneratorJob$IndexGeneratorMapper.innerMap(IndexGeneratorJob.java:300)
    at io.druid.indexer.HadoopDruidIndexerMapper.map(HadoopDruidIndexerMapper.java:87)
    ... 8 more

data:

3b81dc53ba66\x10,,\xe0\xcf\xe97c3b7e4b519103    123 abc false   2018-01-02 01:57:44
3b81dc53ba66\x10,,\xe0\xcf\xe97c3b7e4b519103    456 ccc false   2018-01-02 02:57:44
3b81dc53ba667c3b7e4b519103  456 ccc true    2018-01-02 03:57:44

json file:

{
    "type":"index_hadoop",
    "spec":{
        "ioConfig":{
            "type":"hadoop",
            "inputSpec":{
                "type":"static",
                "paths":".../AbtestExperimentExec.txt"
            }
        },
        "dataSchema":{
            "dataSource":"valid_llj",
            "granularitySpec":{
                "type":"uniform",
                "segmentGranularity":"day",
                "queryGranularity":"day",
                "intervals":[
                    "2018-01-02/2018-01-03"
                ]
            },
            "parser":{
                "type":"hadoopyString",
                "parseSpec":{
                    "format":"tsv",
                    "listDelimiter":",",
                    "dimensionsSpec":{
                        "dimensions":[
                            "rid",
                            "sdkver",
                            "sid"
                        ]
                    },
                    "columns":[
                        "uid",
                        "rid",
                        "sdkver",
                        "sid",
                        "acc_time"
                    ],
                    "delimiter":"\t",
                    "timestampSpec":{
                        "format":"yyyy-MM-dd HH:mm:ss",
                        "column":"acc_time"
                    }
                }
            },
            "metricsSpec":[
                {
                    "name":"r",
                    "type":"count"
                },
                {
                    "name":"m2",
                    "type":"thetaSketch",
                    "fieldName":"uid"
                }
            ]
        },
        "tuningConfig":{
            "type":"hadoop",
            "partitionsSpec":{
                "type":"hashed",
                "targetPartitionSize":5000000
            },
            "jobProperties":{
                "mapreduce.map.memory.mb":"1000",
                "mapreduce.reduce.memory.mb":"12000",
                "mapreduce.job.queuename":"druid",
                "mapreduce.job.classloader":true
            }
        }
    }
}

code:

static void updateUnion(Union union, Object update)
{
  if (update instanceof SketchHolder) {
    ((SketchHolder) update).updateUnion(union);
  } else if (update instanceof String) {
    union.update((String) update);
  } else if (update instanceof byte[]) {
    union.update((byte[]) update);
  } else if (update instanceof Double) {
    union.update(((Double) update));
  } else if (update instanceof Integer || update instanceof Long) {
    union.update(((Number) update).longValue());
  } else if (update instanceof int[]) {
    union.update((int[]) update);
  } else if (update instanceof long[]) {
    union.update((long[]) update);
  } else if (update instanceof List) {
    for (Object entry : (List) update) {
      union.update(entry.toString());//Is there need to add a judge for null???
    }
  } else {
    throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
  }
}

Because first column has two commas, after the split, there is a column is empty, in the update, the empty pointer will be reported exceptions, here need to add to the empty judgment

stale[bot] commented 5 years ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 2 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

stale[bot] commented 5 years ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.