allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

mysql5.6.x 是不支持吗?在本地可以连,但在程序里连不了。 #32

Open liangrui1988 opened 3 years ago

liangrui1988 commented 3 years ago

show master status;
mysql-bin-changelog.139198 5292

telnet 10.40.0.109 3306 在本地也是可以进去的。 是什么原因呢?

20/12/08 15:51:13 WARN BinaryLogClient: Failed to establish connection in 1665ms. Forcing disconnect. Exception in thread "connect mysql(10.40.0.109, 3306) " java.io.EOFException at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190) at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) at com.github.shyiko.mysql.binlog.network.protocol.PacketChannel.read(PacketChannel.java:59) at com.github.shyiko.mysql.binlog.BinaryLogClient.confirmSupportOfChecksum(BinaryLogClient.java:882) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:533) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.org$apache$spark$sql$mlsql$sources$mysql$binlog$BinLogSocketServerInExecutor$$_connectMySQL(BinLogSocketServerInExecutor.scala:261) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$$anon$1.run(BinLogSocketServerInExecutor.scala:289)

liangrui1988 commented 3 years ago

` val db = "delta_db" val table = "dlta_tab"

val df = spark.readStream.
  format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
  option("host", host).
  option("port", prot).
  option("userName", user).
  option("password", passwd).
  option("bingLogNamePrefix", "mysql-bin-changelog").
  option("binlogIndex", "139198").
  option("binlogFileOffset", "5292").
  option("databaseNamePattern", db).
  option("tableNamePattern", table).
  load()`

批查询是可用的

val mysqlConf = Map(
      "url" -> url,
      "driver" -> "com.mysql.jdbc.Driver",
      "user" -> user,
      "password" -> passwd,
      "dbtable" -> "(SELECT * FROM dlta_tab limit 10) as dlta_tab_tmp"
    )
    import org.apache.spark.sql.functions.col
    var df = spark.read.format("jdbc").options(mysqlConf).load()
    df = df.repartitionByRange(2, col("id"))
    df.show()
    df.write
      .format("delta").
      mode("overwrite").

      save("/tmp/datahouse/delta/batch_dlta_tab")
    spark.close()
liangrui1988 commented 3 years ago

image 因为我们走的是代理连mysql,所以比较慢,用原始连接设了连接时长就可以了,请问这个参数可以在这里设吗?还是需要改原代码,问题比较多,用的很坚难呀