override def receive: Receive = {
case CreateView(limit) ⇒
sparkSession.createDataFrame(getBeans(limit)).createTempView("test1")
case s: String ⇒
log.info(s"exec $s")
sparkSession.sql(s).show(1000)
case e: Any ⇒ println(e)
}
这两个方法的区别在源码中解释的还是很清晰的:
/**
* Creates a temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @throws AnalysisException if the view name already exists
*
* @group basic
* @since 2.0.0
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
val tableDesc = CatalogTable(
identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
tableType = CatalogTableType.VIEW,
schema = Seq.empty[CatalogColumn],
storage = CatalogStorageFormat.empty)
CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false,
isTemporary = true)
}
/**
* Creates a temporary view using the given name. The lifetime of this
* temporary view is tied to the [[SparkSession]] that was used to create this Dataset.
*
* @group basic
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
val tableDesc = CatalogTable(
identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
tableType = CatalogTableType.VIEW,
schema = Seq.empty[CatalogColumn],
storage = CatalogStorageFormat.empty)
CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true,
isTemporary = true)
}
[ERROR] [11/17/2016 20:13:34.499] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/actor1] Temporary table 'test1' already exists;
org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary table 'test1' already exists;
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:341)
at org.apache.spark.sql.execution.command.CreateViewCommand.createTemporaryView(views.scala:146)
at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:97)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2398)
at com.ximalaya.xql.datatask.TestActor$$anonfun$receive$1.applyOrElse(TestActor.scala:43)
at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
at com.ximalaya.xql.datatask.TestActor.aroundReceive(TestActor.scala:30)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (tableDesc.schema != Nil && tableDesc.schema.length != analyzedPlan.output.length) {
throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).")
}
val sessionState = sparkSession.sessionState
if (isTemporary) {
createTemporaryView(tableDesc.identifier, sparkSession, analyzedPlan)
} else {
// Adds default database for permanent table if it doesn't exist, so that tableExists()
// only check permanent tables.
val database = tableDesc.identifier.database.getOrElse(
sessionState.catalog.getCurrentDatabase)
val tableIdentifier = tableDesc.identifier.copy(database = Option(database))
if (sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
throw new AnalysisException(
s"View $tableIdentifier already exists. If you want to update the view definition, " +
"please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
}
} else {
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
}
Seq.empty[Row]
}
run中主要操作了传入的SparkSession的sessionState这个变量
val sessionState = sparkSession.sessionState
在操作过程中进一步操作了SessionCatalog这个类
/**
* Internal catalog for managing table and database states.
*/
lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf())
SessionCatalog 中有个存放临时表的变量
/** List of temporary tables, mapping from table name to their logical plan. */
@GuardedBy("this")
protected val tempTables = new mutable.HashMap[String, LogicalPlan]
/**
* Create a temporary table.
*/
def createTempView(
name: String,
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempTables.contains(table) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
tempTables.put(table, tableDefinition)
}
/**
* Gets an existing [[SparkSession]] or, if there is no existing one, creates a new
* one based on the options set in this builder.
*
* This method first checks whether there is a valid thread-local SparkSession,
* and if yes, return that one. It then checks whether there is a valid global
* default SparkSession, and if yes, return that one. If no valid global default
* SparkSession exists, the method creates a new SparkSession and assigns the
* newly created SparkSession as the global default.
*
* In case an existing SparkSession is returned, the config options specified in
* this builder will be applied to the existing SparkSession.
*
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
// set app name if not given
val randomAppName = java.util.UUID.randomUUID().toString
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(randomAppName)
}
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }
if (!sc.conf.contains("spark.app.name")) {
sc.conf.setAppName(randomAppName)
}
sc
}
session = new SparkSession(sparkContext)
options.foreach { case (k, v) => session.conf.set(k, v) }
defaultSession.set(session)
// Register a successfully instantiated context to the singleton. This should be at the
// end of the class definition so that the singleton is updated only if there is no
// exception in the construction of the instance.
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
sqlListener.set(null)
}
})
}
return session
}
}
而defaultSession是一个原子类
/** Reference to the root SparkSession. */
private val defaultSession = new AtomicReference[SparkSession]
而为了让他不进入 Global synchronization so we will only set the default session once.的逻辑,我们需要在创建Session后clean这个对象
设置了断点也和我的猜想一样
4个actor,这个断点进入了3次
/**
* Clears the default SparkSession that is returned by the builder.
*
* @since 2.0.0
*/
def clearDefaultSession(): Unit = {
defaultSession.set(null)
}
spark 2.0 踩过的SparkSession的坑
取代了SQLContext(HiveContext)的SparkSession
背景
我的服务端的逻辑是在actor内部进行的,但发现多个actor中执行的过程中,访问到了其他actor内部session中注册的临时表
抽象的运行代码
actor的逻辑大概可以抽象成这样:
驱动的测试类
初步定位问题
现象是发现多个query的执行结果完全相同,这里过滤了很多info日志,最后日志大概是这样的
进一步定位问题
我们观察到其中是有几个warn的,这一点应该就是导致了多个actor内部访问到同一个共享的sparkSession的真正原因,为了加深我们的判断,我们尝试将
createOrReplaceTempView
换成一个更粗暴的createTempView
这两个方法的区别在源码中解释的还是很清晰的:
也就是上面的
createTempView
如果创建视图,而视图名已经存在,将会报错,我们运行一下,看日志输出:果然和我们的猜想是一样的
源码相关分析
通过
createOrReplaceTempView
方法定位到CreateViewCommand
的run
方法中,全类名为org.apache.spark.sql.execution.command.CreateViewCommand
run中主要操作了传入的SparkSession的sessionState这个变量
在操作过程中进一步操作了
SessionCatalog
这个类至此我们已经搞清楚了80%了,就是因为大家共用了同一个sessionState,进而共用了同一个catalog,导致的问题的发生
再看SparkSession的创建
问题就是第一次创建后有了defaultSession存在
而defaultSession是一个原子类
而为了让他不进入
Global synchronization so we will only set the default session once.
的逻辑,我们需要在创建Session后clean这个对象设置了断点也和我的猜想一样
4个actor,这个断点进入了3次
最终解决
log
断点也始终没有进去,问题得到了解决