xuchuanyin / workbench

0 stars 0 forks source link

2018-07-27 bloom #54

Open xuchuanyin opened 6 years ago

xuchuanyin commented 6 years ago

private List intersectFilteredBlocklets(CarbonTable carbonTable, List defaultDataMapPrunedBlocklets, List otherDataMapPrunedBlocklets) { List prunedBlocklets = null; if (BlockletDataMapUtil .isCacheLevelBlock(carbonTable, BlockletDataMapFactory.CACHE_LEVEL_BLOCKLET)) { prunedBlocklets = new ArrayList<>(otherDataMapPrunedBlocklets); // add blocklets from default dataMap that are not filtered by other dataMaps for (ExtendedBlocklet defaultBlocklet : defaultDataMapPrunedBlocklets) { if (!otherDataMapPrunedBlocklets.contains(defaultBlocklet)) { prunedBlocklets.add(defaultBlocklet); } } } else { prunedBlocklets = (List) CollectionUtils .intersection(otherDataMapPrunedBlocklets, defaultDataMapPrunedBlocklets); } return prunedBlocklets; }

xuchuanyin commented 6 years ago

private byte[] convertRowCountFromShortToByteArray(List blockletCountInEachBlock) { int bufferSize = blockletCountInEachBlock.size() * 2; ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); for (Short blockletCount : blockletCountInEachBlock) { byteBuffer.putShort(blockletCount); } byteBuffer.rewind(); return byteBuffer.array(); }

xuchuanyin commented 6 years ago

test("test by modify rcd2") { // minimum per page is 2000 rows CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "2000") // minimum per blocklet is 16MB CarbonProperties.getInstance().addProperty(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, "16") // these lines will result in 3 blocklets in one block and bloom will hit at least 2 of them val lines = 100000 sql("drop table if exists testrcd").collect() val r = new Random() import sqlContext.implicits. val df = sqlContext.sparkContext.parallelize(1 to lines) .map(x => ("No." + r.nextInt(10000), "country" + x % 10000, "city" + x % 10000, x % 10000, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString, UUID.randomUUID().toString)) .toDF("ID", "country", "city", "population", "random1", "random2", "random3", "random4", "random5", "random6", "random7", "random8", "random9", "random10", "random11", "random12") df.write .format("carbondata") .option("tableName", "test_rcd") .option("SORT_COLUMNS", "id") .option("SORT_SCOPE", "LOCAL_SORT") .mode(SaveMode.Overwrite) .save()

sql("select count(*) from test_rcd where city = 'city40'").show(numRows = Integer.MAX_VALUE - 1)
sql("CREATE DATAMAP dm_rcd ON TABLE test_rcd " +
    "USING 'bloomfilter' DMPROPERTIES " +
    "('INDEX_COLUMNS' = 'city', 'BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')")
sql("select count(*) from test_rcd where city = 'city40'").show(numRows = Integer.MAX_VALUE - 1)
sql("drop table if exists test_rcd").collect()
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
  CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "true")

}