Closed twdsilva closed 5 years ago
Please share how you generate data. Do you use TPCDS kit from https://github.com/databricks/tpcds-kit and follow https://github.com/databricks/spark-sql-perf#setup-a-benchmark ?
@juliuszsompolski Thanks for you response. I did use the TPC-DS kit and followed the instructions listed on "set up a benchmark". I have attached the commands that I ran. I noticed this because I tried to write the data to a Phoenix table and ended up with less number of rows than expected. I will rerun the benchmark and see if I still see the issue, or try and repro in a test.
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
// Set:
val rootDir = "/tpch-ds-data"
val databaseName = "TPCH"
val scaleFactor = "1"
val format = "parquet"
// Run:
val tables = new TPCDSTables(sqlContext,
dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen
scaleFactor = scaleFactor,
useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
useStringForDate = false) // true to replace DateType with StringType
tables.genData(
location = rootDir,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables = true, // create the partitioned fact tables
clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = 100) // how many dsdgen partitions to run - number of input tasks.
// Create the specified database
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = true)
// Or, if you want to create temporary tables
// tables.createTemporaryTables(location, format)
// For CBO only, gather statistics on all columns:
tables.analyzeTables(databaseName, analyzeColumns = true)
// write to phoenix table
val df = sql("SELECT * FROM store_sales")
df.write.format("org.apache.phoenix.spark").mode("overwrite").option("table", "store_sales"").option("zkUrl", "m1:2181").save()
@juliuszsompolski
The store_sales table defines a composite primary key of (ss_item_sk, ss_ticket_number)
. The data was indeed generated correctly. Sorry for the noise.
scala> val res = sql("SELECT distinct count(ss_item_sk, ss_ticket_number) FROM store_sales")
res: org.apache.spark.sql.DataFrame = [count(ss_item_sk, ss_ticket_number): bigint]
scala> res.show
+-----------------------------------+
|count(ss_item_sk, ss_ticket_number)|
+-----------------------------------+
| 2879789|
+-----------------------------------+
Right,
I did not look fully awake at your issue yesterday :-).
ss_item_sk
is the foreign key to the item
table. store_sales
is a denormalized orders-lineitem table, where ss_ticket_number
is the order key, and there can be multiple items on an order
, hence the item key and order key together form the key.
The TPC-DS spec defines primary key constraints for tables (Section 2.2.1.2)
However, it looks like this constraint isn't enforced while generating data. For example if I run the following two queries on the
store_sales
table, I get different number of rows even though thess_item_sk
is defined as the primary key.