Closed cryptickp closed 8 years ago
I can commit an example, but here is a quick one (this is all just a thin wrapper job on top of JbdcRdd):
runLocal
-- true
runs it in local mode on Spark. false
runs it against the Spark masterjdbcConnection
-- JDBC cxn detailsparquetOutputPath
-- where on HDFS or S3 or local fs that you want to write this Parquet dump.table
-- table name in SQLpartition
-- number of partitions of the Parquet filesindexColumn
-- an index column that you will be using to load contiguous selections from the source table (should be an auto-incrementing id column).minIndex
-- start loading from this index.maxIndex
-- load up to this index.schema
-- list of (input column name, type, [optional output column name]) that you will be using to read columns of specified types from SQL and write them into Parquet.readColumn
-- column name to read from SQL.writeType
-- Parquet type to extract from the read column. Choose from: Short, Int, String, Long, and Epoch. Epoch is really just Long, but is useful for reading timestamp and datetime values into milliseconds. When Spark fixes writes of Parquet's Timestamp data type, this should be replaced.writeColumn
-- optional name of the output column to write to. Defaults to readColumn
Run the job with:
spark-submit \
--class com.imgur.spark.jdbc2parquet.JDBC2Parquet \
--master $SPARK_MASTER \
--driver-memory 3072m \
--executor-memory 3072m \
--conf "spark.cores.max=20" \
/path/to/jdbc2parquet/target/jdbc2parquet-0.0.1-jar-with-dependencies.jar \
-c configs/clickstream_table_etl_config.json
The config file is passed with -c
flag to the job.
E.g.
clickstream_table_etl_config.json
{
"runLocal" : false,
"jdbcConnection" : {
"driverClass" : "com.mysql.jdbc.Driver",
"connectionPath" : "jdbc:mysql://mysql/db?autoReconnect=true",
"user" : "user",
"password" : "password"
},
"parquetOutputPath" : "hdfs://hdfs/clickstream",
"table": "clickstream",
"partitions" : 10000,
"indexColumn" : "id",
"minIndex" : 0,
"maxIndex" : 30000000000,
"schema" : [
{
"readColumn" : "id",
"writeType" : "Long"
},
{
"readColumn" : "user_id",
"writeType" : "String"
},
{
"readColumn" : "item_id",
"writeType" : "String"
},
{
"readColumn" : "time_stamp",
"writeType" : "Epoch",
"writeColumn" : "ts"
}
]
}
If you are using this, you should probably update the pom.xml with your version of Spark. Some things might break in newer versions of Spark, but should be easy to fix (read up on how Spark and SparkSQL work with Parquet).
Sure thanks. I'll give a try.
@shashir this did indeed worked although I've to do changes on my cluster. Do you know how can you do this to entire DB. Rather than manually specifying schema to every table.
Could you upload how sample configuration looks like.