Open lw-lin opened 7 years ago
当使用structured streaming的foreach sink时,如果希望对partition内的value进行批量输出时,有什么好的办法么。
val query = words.writeStream.foreach(new ForeachWriter[String] {
val arr = scala.collection.mutable.ArrayBuffer[String]()
var conn: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
conn = DBUtil.getConn
true
}
override def close(errorOrNull: Throwable) = {
DBUtil.insert(conn,arr.toArray)
conn.close()
}
override def process(value: String) = {
arr += value
}
}).start()
目前采用的是在 process 中把数据添加到集合,再在close方法内进行批量写入。有其他更为优雅的写法么,谢谢
@Dreamtecher
非常好的问题 —— 但目前的版本(as of 2017.09, Spark 2.2.0)我没有想到更好的写法。
@lw-lin 好的,谢谢您的回复~
如需要贴代码,请复制以下内容并修改:
谢谢!