Spark SQL caches Parquet metadata for better performance.
When Hive metastore Parquet table conversion is enabled, metadata of those converted tables are also cached.
If these tables are updated by Hive or other external tools, you need to refresh them manually to ensure consistent metadata.
//org.apache.spark.sql.catalog.Catalog
@transient lazy val catalog: Catalog = new CatalogImpl(self)
这是个接口,我们看子类实现的refreshTable:
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent) //核心关键
...
}
//用来cacheLogicalPlan的
val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
.internal()
.doc("The maximum size of the cache that maps qualified table names to table relation plans.")
.intConf
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
.createWithDefault(1000)
SparkConf:
def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
SessionCatalog:
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}
def refreshTable(name: TableIdentifier): Unit = synchronized {
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)
// Go through temporary tables and invalidate them.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}
// Also invalidate the table relation cache.
val qualifiedTableName = QualifiedTableName(dbName, tableName)
tableRelationCache.invalidate(qualifiedTableName)
}
spark metadata cache
背景
最近一直忙着搞apm,也没时间写博客,眼看5月已经过半了,赶紧写一篇压压惊,先描述下背景:
我们将sparkSession封装在actor中,每个actor都有自己独占的sparkSession,有些sql是保存数据到hive和hdfs上,但由于是一个多线程模型,如果不加任何干预的情况下,actor1跑出来的数据通过actor2读的时候会抛出以下异常:
源码分析
报错之后去官网找了下,由于报错信息中的keyword是refresh,搜了下spark sql的官方文档,找到了相关介绍
spark docs
大意就是为了性能对元数据做了缓存,如果外部系统跑出来的,在sparkSql中需要去
refreshTable
相关方法是
的确,在我们线程模型中,由于actor互相不可见,且没有共用sessionCatalog,所以a actor也相当于b actor的外部系统,很早之前写了一篇讲升级spark2踩过的的sparkSession的坑,就是讲这个问题,相关文章链接如下
spark2.0踩过的SparkSession的坑
那么我们就顺藤摸瓜看看这个方法能不能被hack
首先sparkSession中的catalog如下:
这是个接口,我们看子类实现的
refreshTable
:里面用的
sessionCatalog
其实是sparkSession.sessionState.catalog
由于调用链路还是比较长的,所以我们下面只会讲核心方法的调用,这里调用到了
org.apache.spark.sql.catalyst.catalog.SessionCatalog
的refreshTable
,由于我们是enableHive,所以最终用到的是这个类的子类org.apache.spark.sql.hive.HiveSessionCatalog
,这个方法里面还调用了一下metastoreCatalog.refreshTable(name)
对,我们已经离真相不远了,最终被调用到的就是
org.apache.spark.sql.hive.HiveMetastoreCatalog
:对的,里面就是用了个
guava
的LoadingCache
,并且还坑爹的hard code了maximumSize
,好吧,这下缓存禁用也不行了,我们只能hack了hack过程
我们在调用过程中发现HiveSessionCatalog中有一个
invalidateCache
就是用来清除这个loadingCache的:由于是包可见的类,所以写了个小工具:
现在在我需要清除缓存的时候,用这个工具清理下就可以
后记
最新版的spark中已经统一了这一块的代码,而且也不需要这样hack了,缓存也可以禁用了
StaticSQLConf:
SparkConf:
SessionCatalog: