fullcontact / hadoop-sstable

Splittable Input Format for Reading Cassandra SSTables Directly
Apache License 2.0
49 stars 14 forks source link

double quote filed in cql caught JsonColumnParser NPE #27

Closed zqhxuyuan closed 8 years ago

zqhxuyuan commented 8 years ago

Hi, I use hadoop2.4.1 and cass2.0.15, running SSTableIndexIndexer is ok, but running SimpleExample has problem:

15/11/07 17:28:41 INFO example.SimpleExample: Setting initial input paths to /user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity
15/11/07 17:28:45 INFO example.SimpleExample: Setting initial output paths to /user/qihuang.zheng/velocity_test
15/11/07 17:28:47 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
15/11/07 17:28:49 INFO input.FileInputFormat: Total input paths to process : 70
15/11/07 17:28:50 INFO mapreduce.JobSubmitter: number of splits:10
15/11/07 17:28:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446657831952_0012
15/11/07 17:28:51 INFO impl.YarnClientImpl: Submitted application application_1446657831952_0012
15/11/07 17:28:51 INFO mapreduce.Job: The url to track the job: http://spark047216:23188/proxy/application_1446657831952_0012/
15/11/07 17:28:51 INFO mapreduce.Job: Running job: job_1446657831952_0012
15/11/07 17:29:01 INFO mapreduce.Job: Job job_1446657831952_0012 running in uber mode : false
15/11/07 17:29:01 INFO mapreduce.Job:  map 0% reduce 0%
15/11/07 17:29:01 INFO mapreduce.Job: Job job_1446657831952_0012 failed with state FAILED due to: Application application_1446657831952_0012 failed 2 times due to AM Container for appattempt_1446657831952_0012_000002 exited with  exitCode: 1 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException:
org.apache.hadoop.util.Shell$ExitCodeException:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:505)
    at org.apache.hadoop.util.Shell.run(Shell.java:418)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Container exited with a non-zero exit code 1
.Failing this attempt.. Failing the application.
15/11/07 17:29:01 INFO mapreduce.Job: Counters: 0
15/11/07 17:29:01 INFO example.SimpleExample: Total runtime: 21s

my hadoop env is ok,because running example like wordcount is ok. the log indicate map 0% and redcue %, means mapper not being called at all. but why? I really don't know
when I run cmd without -D hadoop.sstable.cql, no exception: Failed CQL create statement empty happen, which should be, as SimpleExampleMapper catch this exception

if (cql == null || cql.trim().isEmpty()) {
            throw new RuntimeException("Failed CQL create statement empty");
        }

And I debug code find reading input: SSTableRowInputFormat is normal.

15/11/07 17:43:43 DEBUG ipc.ProtobufRpcEngine: Call: getListing took 5ms
15/11/07 17:43:43 DEBUG input.FileInputFormat: Time taken to get FileStatuses: 136
15/11/07 17:43:43 INFO input.FileInputFormat: Total input paths to process : 70
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Initial file list: 70 [LocatedFileStatus{path=hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-CompressionInfo.db; isDirectory=false; length=87211; replication=3; blocksize=134217728; modification_time=1446861739953; access_time=1446861739918; owner=qihuang.zheng; group=supergroup; permission=rw-r--r--; isSymlink=false},
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Removing non-sstable file: hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-CompressionInfo.db
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Reading index file for sstable file: hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-Data.db
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Reading index file: hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-Index.db
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Final file list: 10 [LocatedFileStatus{path=hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-Data.db; isDirectory=false; length=172873282; replication=3; blocksize=134217728;
15/11/07 17:43:43 DEBUG mapreduce.SSTableInputFormat: Splits calculated: 10 [SSTableSplit{dataStart=0, dataEnd=0, idxStart=0, length=8472466, idxEnd=8472466, dataFile=hdfs://tdhdfs/user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity/forseti-velocity-jb-102234-Data.db,

PS: for classpath problem running together with cassandra. I export classpath then run hadoop jar

export HADOOP_CLASSPATH=/usr/install/cassandra/lib/*:cassandra-all-2.0.15.jar:$HADOOP_CLASSPATH

/usr/install/hadoop/bin/hadoop jar hadoop-sstable-2.0.0.jar com.fullcontact.sstable.example.SimpleExample \
    -D hadoop.sstable.cql="CREATE TABLE velocity (attribute text,partner_code text,app_name text,type text,"timestamp" bigint,event text,sequence_id text,PRIMARY KEY ((attribute), partner_code, app_name, type, "timestamp")) WITH compression={'sstable_compression': 'LZ4Compressor'}" \
    -D mapred.task.timeout=21600000 \
    -D mapred.map.tasks.speculative.execution=false \
    -D mapred.job.reuse.jvm.num.tasks=1 \
    -D io.sort.mb=1000 \
    -D io.sort.factor=100 \
    -D mapred.reduce.tasks=512 \
    -D hadoop.sstable.split.mb=1024 \
    -D mapred.child.java.opts="-Xmx2G -XX:MaxPermSize=256m" \
    /user/qihuang.zheng/velocity_backup_1107/226_1105/1/forseti/velocity /user/qihuang.zheng/velocity_test
zqhxuyuan commented 8 years ago

After use yarn log to see what happen inside, It's the problem of classpath of cassandra and guava.. I have done this, and finally find our cql has a double quote field: timestamp:

CREATE TABLE velocity (attribute text,partner_code text,app_name text,type text,"timestamp" bigint,event text,sequence_id text,PRIMARY KEY ((attribute), partner_code, app_name, type, "timestamp")) WITH compression={'sstable_compression': 'LZ4Compressor'}

I tried use backslash : \"timestamp\". but Exception happend:

15/11/08 19:13:50 INFO mapreduce.Job: Task Id : attempt_1446657831952_0037_m_000005_0, Status : FAILED
Error: java.lang.NullPointerException
    at com.fullcontact.sstable.example.JsonColumnParser.getColumnValueConvertor(JsonColumnParser.java:55)
    at com.fullcontact.sstable.example.JsonColumnParser.serializeColumns(JsonColumnParser.java:87)
    at com.fullcontact.sstable.example.JsonColumnParser.getJson(JsonColumnParser.java:37)
    at com.fullcontact.sstable.example.SimpleExampleMapper.map(SimpleExampleMapper.java:57)
    at com.fullcontact.sstable.example.SimpleExampleMapper.map(SimpleExampleMapper.java:21)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
zqhxuyuan commented 8 years ago

I found null object is CFDefinition, so I add rebuild before getCfDef

    public JsonColumnParser(final CFMetaData cfMetaData) {
        cfMetaData.rebuild();
        this.cfd = cfMetaData.getCfDef();
        System.out.println("CFDefinition:"  + cfd);
        this.columnNameConverter = cfMetaData.comparator;
    }

now cfdef is not null:

2015-11-09 10:43:56,968 INFO [main] com.fullcontact.sstable.example.JsonColumnParser: CFDefinition:attribute, partner_code, app_name, type, timestamp => {event, sequence_id}

and reading is ok,

2015-11-09 10:43:56,998 INFO [main] com.fullcontact.sstable.example.JsonColumnParser: 
...
2015-11-09 10:43:56,998 INFO [main] com.fullcontact.sstable.example.JsonColumnParser: columnName:event,colId:event,cfd:attribute, partner_code, app_name, type, timestamp => {event, sequence_id}
2015-11-09 10:43:56,999 INFO [main] com.fullcontact.sstable.example.JsonColumnParser: columnName:sequence_id,colId:sequence_id,cfd:attribute, partner_code, app_name, type, timestamp => {event, sequence_id}

but some rows has problem

2015-11-09 10:43:57,880 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.cassandra.serializers.MarshalException: String didn't validate.
        at org.apache.cassandra.serializers.UTF8Serializer.validate(UTF8Serializer.java:35)
        at org.apache.cassandra.db.marshal.AbstractType.getString(AbstractType.java:154)
        at com.fullcontact.sstable.example.JsonColumnParser.serializeColumns(JsonColumnParser.java:93)
bvanberg commented 8 years ago

@zqhxuyuan Are which branch of hadoop-sstable are you working from?

eentzel commented 8 years ago

@zqhxuyuan in your hadoop.sstable.cql CREATE TABLE statement, I believe you want just timestamp, with no quotes at all.

zqhxuyuan commented 8 years ago

I'm use branch cassandra-2.0.x. after make little change, I solve the problem: 1.double quotes keyword: timestamp
as timestamp is keyword in C*, create table without double quote will failed, so I use other special char like $ at hadoop.sstable.cql: $timestamp$, and in mapper setup: cql = cql.replace("&", "\""); this can work.

2.UTF8 validate JSONColumnParser.serializeColumns when get column value use C* type conversion cause validate exception, so I just change byteBuffer to String can work:

ByteBuffer buffer = column.value();
String content = byteBufferToString(buffer);
String json = JSONObject.escape(content);
LOG.info("JSON: {}", json);
sb.append(json);

    public static String byteBufferToString(ByteBuffer buffer) {
        CharBuffer charBuffer = null;
        try {
            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
            decoder.onMalformedInput(CodingErrorAction.IGNORE);
            charBuffer = decoder.decode(buffer);
            buffer.flip();
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

offcourse suppose the charset is UTF8 in our system.

3.CFDefinition NPE
as I post previous comment, I add cfMetaData.rebuild() at JsonColumnParser constructor before cfMetaData.getCfDef()