cjuexuan / mynote

237 stars 34 forks source link

spark3升级后es-spark不能正常工作的分析 #72

Open cjuexuan opened 4 years ago

cjuexuan commented 4 years ago

背景

最近在忙着升级spark3,我们自己改的代码基本都已经搞定了,但是外部数据源es还有些问题,这篇文章主要说一下存在的问题和如何修复

现象

我们升级spark3之后,集成测试有些索引是能正常工作的,有些索引却不能读取了,主要的异常信息如下:

scala.None$ is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_3$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:211)
    ... 36 common frames omitted

分析

对于一个问题的分析有多种方式,一种是正向跟踪源代码,找到完全出问题的地方,这种方式更加严谨,但是如果链路很长的时候,跟起来比较累,所以正向跟踪的我等下会在最后补充一下,这里我们说另一种排查的方式,那就是多次实验,观察现象,大胆假设,小心验证,接下来我们先用这种方式排查解决下该问题

大胆假设,小心验证

首先,我们从测试结果看到,并不是所有的索引都不能读了,而是部分索引不能读,我们的第一感觉就是是不是某种类型不支持?如果我们能找到那个不能读的类型,就能针对性的修复了

接下来,我们就来找一下相关代码,首先,我们看看能不能让es少读几个字段,这样我们就可以通过二分,很快的找到有问题的字段了 我们知道,如果用户不指定schema的情况下,es会通过index的mapping信息获取到index的schema 这部分逻辑具体可以看下ElasticsearchRelationlazySchema,这里不展开,而如果用户提供了schema就会用用户的schema,所以我们只要把schema塞到es的dataframe里即可, 至于塞schema的逻辑,也非常简单,我们只需要copy出EsSparkSQLesDF代码,然后增加schema即可

                sparkSession.read.format("org.elasticsearch.spark.sql")
                    .options(esConf.asProperties.asScala.toMap)
                .schema(StructType.fromDDL("这里对index的所有字段做二分")) //新增自定义schema逻辑
                    .load

然后,我们就找到那个有问题的字段,并且看一下这个字段和其他字段的区别,这里略过排查过程,给出现象,那就是有问题的字段是包含空值,而没有问题的字段是不包含空值的,这也能印证我们刚看到的异常信息 scala.None$ is not a valid external type for schema of string

接着,我们找一下es-spark里什么时候会用到None,这里直接文本搜索一下,基本就可以找到了,es-spark包里用None的地方一共有五处,其中ScalaValueWriterRowValueReader以及DefaultSource里都是用在条件判断上,真正赋予None的是ScalaValueReadernullValue方法

  def nullValue() = { None }

至于调用的地方,主要是ScalaValueReadercheckNull方法

  def checkNull(converter: (String, Parser) => Any, value: String, parser: Parser) = {
    if (value != null) {
      if (!StringUtils.hasText(value) && emptyAsNull) {
        nullValue()
      }
      else {
        converter(value, parser).asInstanceOf[AnyRef]
      }
    }
    else {
      nullValue()
    }
  }

这也符合我们刚看到的现象,当出现空值的时候,es-spark会给他赋值成None,但是None在spark3上不能用来填充那些空值,所以出现了上述异常,那么我们就把这一行代码改成

  def nullValue() = { null }

然后编译发包,验证通过

更近一步,正向分析

到这里,问题是解决了,但是有好奇心的小伙伴肯定想到了,为什么这份代码能在spark2.4正常运行,到了spark3就不行了呢,我们还是希望知道到底spark哪里改了,导致这部分代码的行为改变了

spark3

首先,我们找到errorMsg的地方:ValidateExternalType,这里可以通过文本搜索找到

  private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"

  private lazy val checkType: (Any) => Boolean = expected match {
    case _: DecimalType =>
      (value: Any) => {
        value.isInstanceOf[java.math.BigDecimal] || value.isInstanceOf[scala.math.BigDecimal] ||
          value.isInstanceOf[Decimal]
      }
    case _: ArrayType =>
      (value: Any) => {
        value.getClass.isArray || value.isInstanceOf[Seq[_]]
      }
    case _ =>
    // 这里抛出了异常,因为
      val dataTypeClazz = ScalaReflection.javaBoxedType(dataType)
      (value: Any) => {
        dataTypeClazz.isInstance(value)
      }
  }

  override def eval(input: InternalRow): Any = {
    val result = child.eval(input)
    if (checkType(result)) {
      result
    } else {
      throw new RuntimeException(s"${result.getClass.getName}$errMsg")
    }
  }

RowEncoder代码:

        val convertedField = if (field.nullable) {
          If(
            Invoke(inputObject, "isNullAt", BooleanType, Literal(index) :: Nil),
            // Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`.
            // We should use `fieldValue.dataType` here.
            Literal.create(null, fieldValue.dataType),
            fieldValue
          )
        } else {
          fieldValue
        }

最终生成的代码是

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, diagnostics), StringType), true, false) AS diagnostics#50

也就是如果数据是null的话,则返回null,否则的话,当成UTF8String去处理,进入校验环节,然后抛出了我们刚才看到异常

调用的入口在DataSourceStrategy

  private[sql] def toCatalystRDD(
      relation: BaseRelation,
      output: Seq[Attribute],
      rdd: RDD[Row]): RDD[InternalRow] = {
    if (relation.needConversion) {
      val toRow = RowEncoder(StructType.fromAttributes(output)).createSerializer()
      rdd.mapPartitions { iterator =>
        iterator.map(toRow)
      }
    } else {
      rdd.asInstanceOf[RDD[InternalRow]]
    }
  }

我们这里梳理下3的逻辑

  1. DatasourceStrategy里面要将RDD[Row]转换到RDD[InternalRow],以满足spark sql的要求
  2. toRow方法通过RowEncoder.createSerializer实现对row的转换
  3. rowEncoder里面会判断Field是否nullable,如果nullable并且row里的数据也是null的,将返回null,否则会读取row的数据,转换,校验类型
  4. 由于es-spark对于null字段返回了None,所以走到校验逻辑中去了,由于None不是UTF8String,所以就报错了

spark2.4

但是spark2.4没有报错,那么我们从入口看一下spark2.4的逻辑

  private[this] def toCatalystRDD(
      relation: LogicalRelation,
      output: Seq[Attribute],
      rdd: RDD[Row]): RDD[InternalRow] = {
    if (relation.relation.needConversion) {
      execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
    } else {
      rdd.asInstanceOf[RDD[InternalRow]]
    }
  }

    /**
   * Convert the objects inside Row into the types Catalyst expected.
   */
  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
    data.mapPartitions { iterator =>
      val numColumns = outputTypes.length
      val mutableRow = new GenericInternalRow(numColumns)
      val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
      iterator.map { r =>
        var i = 0
        while (i < numColumns) {
          mutableRow(i) = converters(i)(r(i))
          i += 1
        }

        mutableRow
      }
    }
  }

    def createToCatalystConverter(dataType: DataType): Any => Any = {
    if (isPrimitive(dataType)) {
      // Although the `else` branch here is capable of handling inbound conversion of primitives,
      // we add some special-case handling for those types here. The motivation for this relates to
      // Java method invocation costs: if we have rows that consist entirely of primitive columns,
      // then returning the same conversion function for all of the columns means that the call site
      // will be monomorphic instead of polymorphic. In microbenchmarks, this actually resulted in
      // a measurable performance impact. Note that this optimization will be unnecessary if we
      // use code generation to construct Scala Row -> Catalyst Row converters.
      def convert(maybeScalaValue: Any): Any = {
        if (maybeScalaValue.isInstanceOf[Option[Any]]) {
          maybeScalaValue.asInstanceOf[Option[Any]].orNull
        } else {
          maybeScalaValue
        }
      }
      convert
    } else {
      getConverterForType(dataType).toCatalyst
    }
  }

关键就是createToCatalystConverter中的toCatalyst

    final def toCatalyst(@Nullable maybeScalaValue: Any): CatalystType = {
      if (maybeScalaValue == null) {
        null.asInstanceOf[CatalystType]
      } else if (maybeScalaValue.isInstanceOf[Option[ScalaInputType]]) {
        val opt = maybeScalaValue.asInstanceOf[Option[ScalaInputType]]
        if (opt.isDefined) {
          toCatalystImpl(opt.get)
        } else {
          null.asInstanceOf[CatalystType] //这里处理了相关逻辑
        }
      } else {
        toCatalystImpl(maybeScalaValue.asInstanceOf[ScalaInputType])
      }
    }

在这里成功的处理了Option的情况

找了下git log,应该是23262 出于性能的考虑,把rowToRowRdd换成了RowEncoder,感兴趣的小伙伴可以自行跟踪下相关逻辑

scxwhite commented 4 years ago

大佬 牛逼,我也正在升级spark3.0遇见了你这个问题,根据你的方案解决了。请问你的es-spark 3.0是自己编译的吗? 我用的这个 https://github.com/elastic/elasticsearch-hadoop/pull/1498

cjuexuan commented 4 years ago

@scxwhite 是我们自己编译的

One-Big-Bug commented 2 years ago

您们都是大佬,学习了