cjuexuan / mynote

237 stars 34 forks source link

sparkSession再次踩坑记 #54

Open cjuexuan opened 6 years ago

cjuexuan commented 6 years ago

背景

有个小伙伴和我们反馈我们注册udf的 util跑着跑着不生效了

日志如下

[recsys-spark]00:10:55 262  INFO (org.apache.spark.sql.execution.SparkSqlParser:54) - Parsing command: create temporary function hll_create as 'com.ximalaya.dw.hllp.udaf.HllCreate'
[recsys-spark]00:10:57 602 ERROR (org.apache.spark.sql.udf.UDFUtils$:83) - in register hive udaf,throw
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead.
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:1079)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$39.apply(SessionCatalog.scala:1103)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog$$anonfun$39.apply(SessionCatalog.scala:1103)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1103)
    at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:63)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
    at org.apache.spark.sql.udf.UDFUtils$.org$apache$spark$sql$udf$UDFUtils$$registerHiveUDFAndUDAF(UDFUtils.scala:80)
    at org.apache.spark.sql.udf.UDFUtils$$anonfun$doRegister$1.apply(UDFUtils.scala:57)
    at org.apache.spark.sql.udf.UDFUtils$$anonfun$doRegister$1.apply(UDFUtils.scala:49)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.sql.udf.UDFUtils$.doRegister(UDFUtils.scala:49)
    at com.ximalaya.recsys.xql.Utils$.getXqlControl(Utils.scala:26)
    at com.ximalaya.recsys.task.intl.Tool.XqlUtil$.queryXql(XqlUtil.scala:36)

希望我们帮忙排查下

排查

自我代码审视

  private val logger = org.slf4j.LoggerFactory.getLogger(this.getClass)

  def registerUDF(sparkSession: SparkSession): Unit = {
    Option(new File(UDF_PROPERTIES))
      .filter(_.exists())
      .map { input ⇒
        val properties = new Properties()
        properties.load(new FileReader(input))
        properties
      }.foreach(props ⇒ doRegister(props, sparkSession))
  }

  def doRegister(properties: Properties, sparkSession: SparkSession): Unit = {
    properties.asScala.foreach {
      case (name, className) ⇒
        val clazz = Class.forName(className)
        if (classOf[UserDefinedAggregateFunction].isAssignableFrom(clazz)) {
          logger.info("register udaf {}", s"$className#$name")
          sparkSession.udf.register(name, clazz.newInstance().asInstanceOf[UserDefinedAggregateFunction])
        } else if (classOf[UDAF].isAssignableFrom(clazz) || classOf[UDF].isAssignableFrom(clazz)) {
          registerHiveUDFAndUDAF(className, name, sparkSession)
        } else {
          logger.info("register udf {}", s"$className#$name")
          try {
            sparkSession.udf.registerJava(name, className, null)
          } catch {
            case NonFatal(e) ⇒
              logger.error("in register java udf,throw", e)
          }
        }
    }
  }

  def checkProps(properties: Properties): Unit = {
    properties.keys().asScala.map(_.toString).toList.groupBy(x ⇒ x)
      .mapValues(_.size)

  }

  //create temporary function groupAdd as 'com.ximalaya.data.hyperloglog.udaf.GroupAdd'
  private def registerHiveUDFAndUDAF(className: String, funcName: String, sparkSession: SparkSession) = {
    try {
      val sqlText = s"create temporary function $funcName as '$className'"
      sparkSession.sql(sqlText)
    } catch {
      case NonFatal(e) ⇒
        logger.error("in register hive udaf,throw", e)
    }
  }

我们通过行号定位到出错的代码是registerHiveUDFAndUDAF这个方法,那么控制逻辑应该是没啥问题的

spark代码分析

  protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
    // TODO: at least support UDAFs here
    throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
  }

  /**
   * Loads resources such as JARs and Files for a function. Every resource is represented
   * by a tuple (resource type, resource uri).
   */
  def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
    resources.foreach(functionResourceLoader.loadResource)
  }

  /**
   * Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
   */
  def registerFunction(
      funcDefinition: CatalogFunction,
      ignoreIfExists: Boolean,
      functionBuilder: Option[FunctionBuilder] = None): Unit = {
    val func = funcDefinition.identifier
    if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) {
      throw new AnalysisException(s"Function $func already exists")
    }
    val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
    val builder =
      functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
    functionRegistry.registerFunction(func.unquotedString, info, builder)
  }

按照日志我们找到了SessionCatalog,抛错的逻辑是走到了fuctionBuilder.getOrElse,而通过ide发现没有人在调用这个方法的时候传入了为Some的fuctionBuilder,所以很自然的会走到makeFuctionBuilder了,那么一定会抛出UnsupportedOperationException,所以逻辑也没啥问题,那么如果想这里支持hive的udf,应该用到的是hive相关的sessioncatalog,因为makeFuctionBuilder这个方法在HiveSessionCatalog中被重写了

  override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
    makeFunctionBuilder(funcName, Utils.classForName(className))
  }

  /**
   * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
   */
  private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
    // When we instantiate hive UDF wrapper class, we may throw exception if the input
    // expressions don't satisfy the hive UDF, such as type mismatch, input number
    // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
    (children: Seq[Expression]) => {
      try {
        if (classOf[UDF].isAssignableFrom(clazz)) {
          val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
          udf.dataType // Force it to check input data types.
          udf
        } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
          val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
          udf.dataType // Force it to check input data types.
          udf
        } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
          val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
          udaf.dataType // Force it to check input data types.
          udaf
        } else if (classOf[UDAF].isAssignableFrom(clazz)) {
          val udaf = HiveUDAFFunction(
            name,
            new HiveFunctionWrapper(clazz.getName),
            children,
            isUDAFBridgeRequired = true)
          udaf.dataType  // Force it to check input data types.
          udaf
        } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
          val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
          udtf.elementSchema // Force it to check input data types.
          udtf
        } else {
          throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
        }
      } catch {
        case ae: AnalysisException =>
          throw ae
        case NonFatal(e) =>
          val analysisException =
            new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
          analysisException.setStackTrace(e.getStackTrace)
          throw analysisException
      }
    }

到了这里,我就很自然的问业务方是不是没有enableHive导致的这个问题

业务方代码排查


  def getXqlControl(sparkConf: SparkConf) = {
    ...
    val xqlContext = new XQLContext(hadoopConf, engineConf)
    val sparkSession = SparkSession.builder().enableHiveSupport()
      .config(sparkConf).getOrCreate()
    UDFUtils.doRegister(dwUdf,sparkSession)
    ...
    new BatchXQLControl(sparkSession, xqlContext)
  }

  val dwUdf = {
    val properties = new Properties()
    ...
    properties.put("hll_create", "com.ximalaya.dw.hllp.udaf.HllCreate")
    ....
    properties
  }

业务方告诉我这里是enableHive的,那这和逻辑就有点冲突了,因为如果enableHive了,应该不会用SessionCatalog,而会用HiveSessionCatalog才对,除非创建过程中这个sparkSession是有问题

真正原因

真正的原因就是这个sparkSession真的是有问题的,因为这里是getOrCreate,如果其他地方不enableHive并且比这个util里先执行,应该是拿到了一个没enableHive的sparkSession,看了下业务方的日志,往上几行果然有预期的日志

 WARN (org.apache.spark.sql.SparkSession$Builder:66) - Using an existing SparkSession; some configuration may not take effect.

解决方案

  1. 这个sparkSession创建的时候clear一下,这个以前分析过,链接如下spark 2.0 踩过的SparkSession的坑
  2. 搞个单例的SparkSessionUtil