Open fatkun opened 10 years ago
原因是org.apache.hadoop.hive.ql.io.HiveInputFormat缓存了inputFormat(为了性能),会导致多线程下都使用同一个inputformat
修改方法: 1.修改com.hadoop.mapred.DeprecatedLzoTextInputFormat,把hashmap放入threadLocal里或用ConcurrentHashMap 2.修改org.apache.hadoop.hive.ql.io.HiveInputFormat,不缓存inputFormat或者只在线程内缓存
我们用了第一种threadLocal方法,减少内存泄露。
修改完DeprecatedLzoTextInputFormat要记得替换掉hadoop原来的lzo
package com.hadoop.mapred;
import com.hadoop.compression.lzo.LzoIndex;
import com.hadoop.compression.lzo.LzoInputFormatCommon;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class DeprecatedLzoTextInputFormat
extends TextInputFormat
{
private final ThreadLocal<Map<Path, LzoIndex>> indexes = new ThreadLocal()
{
protected Map<Path, LzoIndex> initialValue()
{
return new HashMap();
}
};
protected FileStatus[] listStatus(JobConf conf)
throws IOException
{
List<FileStatus> files = new ArrayList(Arrays.asList(super.listStatus(conf)));
boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
Iterator<FileStatus> it = files.iterator();
while (it.hasNext())
{
FileStatus fileStatus = (FileStatus)it.next();
Path file = fileStatus.getPath();
if (!LzoInputFormatCommon.isLzoFile(file.toString()))
{
if ((ignoreNonLzo) || (LzoInputFormatCommon.isLzoIndexFile(file.toString()))) {
it.remove();
}
}
else
{
FileSystem fs = file.getFileSystem(conf);
LzoIndex index = LzoIndex.readIndex(fs, file);
((Map)this.indexes.get()).put(file, index);
}
}
return (FileStatus[])files.toArray(new FileStatus[0]);
}
protected boolean isSplitable(FileSystem fs, Path filename)
{
if (LzoInputFormatCommon.isLzoFile(filename.toString()))
{
LzoIndex index = (LzoIndex)((Map)this.indexes.get()).get(filename);
return !index.isEmpty();
}
return super.isSplitable(fs, filename);
}
public InputSplit[] getSplits(JobConf conf, int numSplits)
throws IOException
{
FileSplit[] splits = (FileSplit[])super.getSplits(conf, numSplits);
List<FileSplit> result = new ArrayList();
for (FileSplit fileSplit : splits)
{
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
if (!LzoInputFormatCommon.isLzoFile(file.toString()))
{
result.add(fileSplit);
}
else
{
LzoIndex index = (LzoIndex)((Map)this.indexes.get()).get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
}
if (index.isEmpty())
{
result.add(fileSplit);
}
else
{
long start = fileSplit.getStart();
long end = start + fileSplit.getLength();
long lzoStart = index.alignSliceStartToIndex(start, end);
long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen());
if ((lzoStart != -1L) && (lzoEnd != -1L)) {
result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
}
}
}
}
return (InputSplit[])result.toArray(new FileSplit[result.size()]);
}
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)
throws IOException
{
FileSplit fileSplit = (FileSplit)split;
if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString()))
{
reporter.setStatus(split.toString());
return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
}
return super.getRecordReader(split, conf, reporter);
}
}
从jstack可以看到很多线程挂在HashMap.put里