fullcontact / hadoop-sstable

Splittable Input Format for Reading Cassandra SSTables Directly
Apache License 2.0
49 stars 14 forks source link

Too many MapTasks same number as Data.db files #28

Open zqhxuyuan opened 8 years ago

zqhxuyuan commented 8 years ago

Recenty we want to migration data from C to HDFS, and Here is the Mapper:

        protected void map(ByteBuffer key, SSTableIdentityIterator value, Context context) throws IOException, InterruptedException {
            final ByteBuffer newBuffer = key.slice();
            String partitionKey = UTF8Type.instance.getString(newBuffer);

            StringBuffer sb = null;
            int i=0;
            while (value.hasNext()) {
                OnDiskAtom atom = value.next();
                if (atom instanceof Column) {
                    Column column = (Column) atom;
                    String cn = CQLUtil.getColumnName(column.name(), columnNameConverter);
                    String cv = CQLUtil.byteBufferToString(column.value());

                    if(i%3==0 && "".equals(cv)){
                        sb = new StringBuffer(partitionKey);
                        sb.append(":").append(cn);
                    }else if(i%3==1 && cn.substring(cn.lastIndexOf(":")+1).equals("event")){
                        sb.append(cv);
                    }else if(i%3==2 && cn.substring(cn.lastIndexOf(":")+1).equals("sequence_id")){
                        sb.append(":" + cv);
                        context.write(new Text(sb.toString()), null);
                    }
                }
                i++;
            }
        }

Because we have 2 regular column(event and sequence_id). so mapper output column like this:

PartitionKey
        cluster-key-values: 
        cluster-key-values:regularColumn1 
        cluster-key-values:regularColumn2

And we aggregation One Row by : PartitionKey:cluster-key-values:regularColumn1Value:regularColumn2Value In this way, one row like this looks more like CQL result or DBMS one row.

Our one SSTable file size almost 160M from Cassandra, put to HDFS(Block size=128MB):
160m

have 1674 Data.db files(Almost 300G data from C*):

[qihuang.zheng@spark047219 ~]$ /usr/install/hadoop/bin/hadoop fs -ls -R /user/qihuang.zheng/velocity_backup_1107/226_1105 | grep "Data" | wc -l
1674

After running in cluster(11Nodes), I see MapTask number is the same as Data.db files:
1674tasks

And ofcourse this job will take many time. I set -D mapred.map.tasks=180, but map task number still keep 1674.
I guess map task number can't assign, As it read from HDFS InputSplit.

Is there any way to run MR job quickly? Or Does Yarn application will decrease running time?

bvanberg commented 8 years ago

What compaction are you using? Size tiered, leveled?

As configured your splits are bigger than your sstables which is why you are seeing a 1:1 between sstable and mapper. With files this small it's almost not worth splitting further. Aggregating the small files into larger ones to get fewer mappers is not something that is currently possible with hadoop-sstable as it was designed to solve the opposite problem, very large sstables. Map reduce is generally not well suited to lots of small files. If your job is running slow is there any possibility of running with more nodes?

zqhxuyuan commented 8 years ago

Leveled compaction in C. most file size is 160M, as generate by C and can't change it. default -D hadoop.sstable.split.mb=1000. So sstable size large than 1G may be better to do it. I have found sstablesplit tool which split large sstable to small, but not found merge from small to large. More sadly, now even add nodes, it can't be too much more than 50 nodes. So I'll figure out other way.

bvanberg commented 8 years ago

Yeah, FTR we have jobs that run against sstables with leveled compaction. Which results in many files and many mappers but runs in a matter of hours. I don't know what your environment is but ours look like:

AWS EMR 64 nodes m2.4xlarge

Move those over to SSD instances and things go very fast.