Open xuchuanyin opened 6 years ago
package org.apache.carbondata.integration.spark.testsuite.dataload
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
/**
* Copyright ©, 1988-2017, Huawei Tech.Co., Ltd.
*
* @Name: TestLoadDataWithUnsafeMemory
* @Description: TestLoadDataWithUnsafeMemory
* @Author: x00251853
* @Date: 3/12/18
* @Others:
*/
class TestLoadDataWithUnsafeMemory extends QueryTest
with BeforeAndAfterEach with BeforeAndAfterAll {
val originUnsafeSortStatus: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
val originUnsafeMemForSort: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT)
val originUnsafeMemForWorking: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
val originUnsafeSizeForChunk: String = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB,
CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT)
val targetTable = "table_unsafe_memory"
override def beforeEach(): Unit = {
sql(s"drop table if exists $targetTable ")
}
override def afterEach(): Unit = {
sql(s"drop table if exists $targetTable ")
}
override protected def beforeAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, "1024")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "512")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, "512")
}
override def afterAll(): Unit = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, originUnsafeSortStatus)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB, originUnsafeMemForSort)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, originUnsafeMemForWorking)
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB, originUnsafeSizeForChunk)
}
private def testSimpleTable(): Unit = {
// This number is chosen to reproduce issue CARBONDATA-2246. It was choose on purpose that the
// records in memory will consume about two more unsafe-row-pages and the last one will exhaust
// the working memory.
val lineNum: Int = 70002
val df = {
import sqlContext.implicits._
sqlContext.sparkContext.parallelize((1 to lineNum).reverse)
.map(x => (s"a$x", s"b$x", s"c$x", 12.3 + x, x, System.currentTimeMillis(), s"d$x"))
.toDF("c1", "c2", "c3", "c4", "c5", "c6", "c7")
}
df.write
.format("carbondata")
.option("tableName", targetTable)
.option("SORT_COLUMNS", "c1,c3")
.save()
checkAnswer(sql(s"select count(*) from $targetTable"), Row(lineNum))
checkAnswer(sql(s"select count(*) from $targetTable where c5 > 5000"), Row(lineNum - 5000))
}
test("unsafe sort with chunk size equal to working memory") {
testSimpleTable()
}
}