allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

Spark-binlog如何进行Delta table表的初始化呢? #11

Closed zhengqiangtan closed 4 years ago

zhengqiangtan commented 4 years ago

可以先将MySQL表的数据全量一次性同步到delta table ,后续的变更采用spark-binglog实时程序去处理吗?

allwefantasy commented 4 years ago

目前是需要这样的。你先需要全量同步一次,之后才能用spark-binlog。具体做法你可以用spark jdbc读取MySQL,然后写入到detla里。

zhengqiangtan commented 4 years ago

many thanks~

bebee4java commented 4 years ago

in spark sql,you can:

import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("wow")
      .getOrCreate()

    val mysqlConf = Map(
      "url" -> "jdbc:mysql://localhost:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false",
      "driver" -> "com.mysql.jdbc.Driver",
      "user" -> "root",
      "password" -> "123456",
      "dbtable" -> "script_file"
    )

    import org.apache.spark.sql.functions.col
    var df = spark.read.format("jdbc").options(mysqlConf).load()
    df = df.repartitionByRange(2, col("id") )
    df.write
      .format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
      mode("overwrite").
      save("/tmp/datahouse/mlsql_console/script_file")
    spark.close()

Finally, you get delta table in hdfs: ➜ ~ hadoop fs -ls /tmp/datahouse/mlsql_console/script_file Found 3 items drwxr-xr-x - sgr supergroup 0 2019-12-10 17:24 /tmp/datahouse/mlsql_console/script_file/_delta_log -rw-r--r-- 1 sgr supergroup 5971 2019-12-10 17:24 /tmp/datahouse/mlsql_console/script_file/part-00000-5b3eca04-0746-4cbf-811f-ae8795cc56fd-c000.snappy.parquet -rw-r--r-- 1 sgr supergroup 8293 2019-12-10 17:24 /tmp/datahouse/mlsql_console/script_file/part-00001-fa092e01-cf92-4558-be8d-a1d83f128a8d-c000.snappy.parquet

in mlsql, it gets easier:

connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;

load jdbc.`db_cool.script_file`  as script_file;

run script_file as TableRepartition.`` where partitionNum="2" and partitionType="range" and partitionCols="id"
as rep_script_file;

save overwrite rep_script_file as delta.`mysql_mlsql_console.script_file` ;