apache / incubator-graphar

An open source, standard data file format for graph data storage and retrieval.
https://graphar.apache.org/
Apache License 2.0
192 stars 40 forks source link

feat(c++,spark): support json payload file format #488

Closed amygbAI closed 2 weeks ago

amygbAI commented 1 month ago

Reason for this PR

Fixes for issue #170 added JSON format read & write support for scala and read support for c++ as arrow is yet to support writer

What changes are included in this PR?

changes to the following files

  1. c++ changes in /cpp/include/gar/fwd.h and /cpp/src/filesystem.cc
  2. changes in /spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala ; /spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala
  3. added folder json and file JSONWriterBuilder.scala

Are these changes tested?

Yes .. some tests are included in the testing@170 folder

Are there any user-facing changes?

Not sure about this one. Is ability to upload a new file format a user facing one OR a backend one ??

SemyonSinchenko commented 1 month ago

May we add it here also?

acezen commented 1 month ago

May we add it here also?

yes, hi, @amygbAI, could you help to add the JSON file type to pyspark library?

BTW, the CI seems stuck by the code format check, you can fix the format by following command:

c++

make graphar-clformat

spark

mvn spotless:apply
amygbAI commented 1 month ago

hey folks, have added the change required for pyspark and also ran the code formatting for scala and c++ .. 2 questions a) is there something similar to be run for pyspark as well ? b) do i need to open another pull request ?

acezen commented 1 month ago

hey folks, have added the change required for pyspark and also ran the code formatting for scala and c++ .. 2 questions a) is there something similar to be run for pyspark as well ? b) do i need to open another pull request ?

hey folks, have added the change required for pyspark and also ran the code formatting for scala and c++ .. 2 questions a) is there something similar to be run for pyspark as well ? b) do i need to open another pull request ?

1) yes, there are tests for pyspark, you can refer to https://github.com/apache/incubator-graphar/tree/main/pyspark/tests 2) no need to open another pull request, just update to this PR is ok

amygbAI commented 1 month ago

tested .. please carry on

acezen commented 1 month ago

The Spark CI got some error like:

Error: ] /home/runner/work/incubator-graphar/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala:42: object JSONWriteBuilder is not a member of package org.apache.graphar.datasources.json
Error: ] /home/runner/work/incubator-graphar/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala:87: class JSONOptions in package json cannot be accessed in package org.apache.spark.sql.catalyst.json

Could you double check that the JSONOptions is accessed in package org.apache.spark.sql.catalyst.json?

Notes that we run the Spark CI on spark.3.x and spark 3.2.x, the spark 3.4.x code and above may not work. https://github.com/apache/incubator-graphar/blob/87d5eeb6a7f86574328da26f08d950e3c5bc3de8/.github/workflows/spark.yaml#L47-L52

amygbAI commented 1 month ago

thanks for the feedback ! let me get to this by the end of the week ..

On Wed, May 22, 2024 at 12:34 PM Weibin Zeng @.***> wrote:

The Spark CI got some error like:

Error: ] /home/runner/work/incubator-graphar/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala:42: object JSONWriteBuilder is not a member of package org.apache.graphar.datasources.json Error: ] /home/runner/work/incubator-graphar/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala:87: class JSONOptions in package json cannot be accessed in package org.apache.spark.sql.catalyst.json

Could you double check that the JSONOptions is accessed in package org.apache.spark.sql.catalyst.json?

Notes that we run the Spark CI on spark.3.x and spark 3.2.x, the spark 3.4.x code and above may not work.

https://github.com/apache/incubator-graphar/blob/87d5eeb6a7f86574328da26f08d950e3c5bc3de8/.github/workflows/spark.yaml#L47-L52

— Reply to this email directly, view it on GitHub https://github.com/apache/incubator-graphar/pull/488#issuecomment-2124022333, or unsubscribe https://github.com/notifications/unsubscribe-auth/ATIQOSC5EMCFPHLDR3II5A3ZDQ7QJAVCNFSM6AAAAABIBS443WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMRUGAZDEMZTGM . You are receiving this because you were mentioned.Message ID: @.***>

SemyonSinchenko commented 1 month ago

image It is private in 3.2, but it is public in the current master.

SemyonSinchenko commented 1 month ago

Resolving #493 should resolve an access problem. And it is quite fast task (just rename packages and fix pom). I can do it fast. @acezen what do you think?

acezen commented 1 month ago

Resolving #493 should resolve an access problem. And it is quite fast task (just rename packages and fix pom). I can do it fast. @acezen what do you think?

There is a tricky workaround is that we borrow the JSONOptions implementation code from spark directly if it's private.

amygbAI commented 1 month ago

hi @acezen ..how would u like to proceed with JsonOptions case ? should i simply take the 3.4 version from spark directly ?

acezen commented 1 month ago

hi @acezen ..how would u like to proceed with JsonOptions case ? should i simply take the 3.4 version from spark directly ?

How about just borrow the code directly from spark from 3.2 and 3.3 as a temporary solution, since we are now only test spark 3.2 and 3.3. And we can wait to #493 solved, it's ok to remove the borrowed code.

amygbAI commented 1 month ago

cool .. let me do that and get back to you folks

amygbAI commented 3 weeks ago

hi @acezen this is proving to be a lot more tricky than just copying the implementation locally and making it public ..here's what i have tried so far ( in both spark 3.2 and 3.3 ..same issues )

package org.apache.graphar.datasources.json

import java.nio.charset.{Charset, StandardCharsets}
import java.time.ZoneId
import java.util.Locale

import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder}
import com.fasterxml.jackson.core.json.JsonReadFeature

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy

/**
 * Options for parsing JSON data into Spark SQL rows.
 *
 * Most of these map directly to Jackson's internal options, specified in [[JsonReadFeature]].
 */
class JSONOptions(
    @transient val parameters: CaseInsensitiveMap[String],
    defaultTimeZoneId: String,
    defaultColumnNameOfCorruptRecord: String)
  extends Logging with Serializable  {

  def this(
    parameters: Map[String, String],
    defaultTimeZoneId: String,
    defaultColumnNameOfCorruptRecord: String = "") = {
      this(
        CaseInsensitiveMap(parameters),
        defaultTimeZoneId,
        defaultColumnNameOfCorruptRecord)
  }

  val samplingRatio =
    parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
  val primitivesAsString =
    parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
  val prefersDecimal =
    parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
  val allowComments =
    parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
  val allowUnquotedFieldNames =
    parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
  val allowSingleQuotes =
    parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
  val allowNumericLeadingZeros =
    parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
  val allowNonNumericNumbers =
    parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
  val allowBackslashEscapingAnyCharacter =
    parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
  private val allowUnquotedControlChars =
    parameters.get("allowUnquotedControlChars").map(_.toBoolean).getOrElse(false)
  val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
  val parseMode: ParseMode =
  val columnNameOfCorruptRecord =
    parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

  // Whether to ignore column of all null values or empty array/struct during schema inference
  val dropFieldIfAllNull = parameters.get("dropFieldIfAllNull").map(_.toBoolean).getOrElse(false)

  // Whether to ignore null fields during json generating
  val ignoreNullFields = parameters.get("ignoreNullFields").map(_.toBoolean)
    .getOrElse(SQLConf.get.jsonGeneratorIgnoreNullFields)

  // A language tag in IETF BCP 47 format
  val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

  val zoneId: ZoneId = DateTimeUtils.getZoneId(
    parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

  val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

  val timestampFormat: String = parameters.getOrElse("timestampFormat",
    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
      s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
    } else {
      s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
    })

  val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
    require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
    sep
  }

  protected def checkedEncoding(enc: String): String = enc

  /**
   * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE.
   * If the encoding is not specified (None) in read, it will be detected automatically
   * when the multiLine option is set to `true`. If encoding is not specified in write,
   * UTF-8 is used by default.
   */
  val encoding: Option[String] = parameters.get("encoding")
    .orElse(parameters.get("charset")).map(checkedEncoding)

  val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
    lineSep.getBytes(encoding.getOrElse(StandardCharsets.UTF_8.name()))
  }
  val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")

  /**
   * Generating JSON strings in pretty representation if the parameter is enabled.
   */
  val pretty: Boolean = parameters.get("pretty").map(_.toBoolean).getOrElse(false)
  val inferTimestamp: Boolean = parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false)

  /**
   * Generating \u0000 style codepoints for non-ASCII characters if the parameter is enabled.
   */
  val writeNonAsciiCharacterAsCodePoint: Boolean =
    parameters.get("writeNonAsciiCharacterAsCodePoint").map(_.toBoolean).getOrElse(false)

  /** Build a Jackson [[JsonFactory]] using JSON options. */
  def buildJsonFactory(): JsonFactory = {
    new JsonFactoryBuilder()
      .configure(JsonReadFeature.ALLOW_JAVA_COMMENTS, allowComments)
      .configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
      .configure(JsonReadFeature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
      .configure(JsonReadFeature.ALLOW_LEADING_ZEROS_FOR_NUMBERS, allowNumericLeadingZeros)
      .configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
      .configure(
        JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
        allowBackslashEscapingAnyCharacter)
      .configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS, allowUnquotedControlChars)
      .build()
  }
}

class JSONOptionsInRead(
    @transient override val parameters: CaseInsensitiveMap[String],
    defaultTimeZoneId: String,
    defaultColumnNameOfCorruptRecord: String)
  extends JSONOptions(parameters, defaultTimeZoneId, defaultColumnNameOfCorruptRecord) {
  def this(
    parameters: Map[String, String],
    defaultTimeZoneId: String,
    defaultColumnNameOfCorruptRecord: String = "") = {
    this(
      CaseInsensitiveMap(parameters),
      defaultTimeZoneId,
      defaultColumnNameOfCorruptRecord)
  }

  protected override def checkedEncoding(enc: String): String = {
    val isDenied = JSONOptionsInRead.denyList.contains(Charset.forName(enc))
    require(multiLine || !isDenied,
      s"""The $enc encoding must not be included in the denyList when multiLine is disabled:
         |denylist: ${JSONOptionsInRead.denyList.mkString(", ")}""".stripMargin)

    val isLineSepRequired =
        multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 || lineSeparator.nonEmpty
    require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding")

    enc
  }
}

object JSONOptionsInRead {
  // The following encodings are not supported in per-line mode (multiline is false)
  // because they cause some problems in reading files with BOM which is supposed to
  // present in the files with such encodings. After splitting input files by lines,
  // only the first lines will have the BOM which leads to impossibility for reading
  // the rest lines. Besides of that, the lineSep option must have the BOM in such
  // encodings which can never present between lines.
  val denyList = Seq(
    Charset.forName("UTF-16"),
    Charset.forName("UTF-32")
  )
}

so basically wherever private was used i removed it in the local directory BUT then i get the error in the following code in the JSONWriterBuilder.scala file i had written ( similar to CSVWriterBuilder.scala )

    new OutputWriterFactory {
      override def newInstance(
          path: String,
          dataSchema: StructType,
          context: TaskAttemptContext
      ): OutputWriter = {
        new JsonOutputWriter(path, parsedOptions, dataSchema, context)
      }

      override def getFileExtension(context: TaskAttemptContext): String = {
        ".json" + CodecStreams.getCompressionExtension(context)
      }
    }

_[ERROR] [Error] /datadrive/GRAPH_AR/act_gitclone/incubator-graphar/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/json/JSONWriterBuilder.scala:64: type mismatch; found : JSONOptions (in org.apache.graphar.datasources.json) required: JSONOptions (in org.apache.spark.sql.catalyst.json)

so i tried overriding the JsonOutputWriter with a custom JsonOutputWriter

package org.apache.graphar.datasources.json

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JacksonGenerator
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types.StructType

class CustomJsonOutputWriter(
    path: String,
    options: JSONOptions,
    dataSchema: StructType,
    context: TaskAttemptContext
) extends OutputWriter {

  private val generator = new JacksonGenerator(dataSchema, options)

  override def write(row: InternalRow): Unit = {
    generator.write(row)
  }

  override def close(): Unit = {
    generator.close()
  }
}

but this has tons of issues with the JacksonGenerator being private

it would be nice to see what the community feels about this OR do we just have a read only for JSON till we support spark 3.5 ??

SemyonSinchenko commented 3 weeks ago

Let's just wait for #514 and the problem with private[sql] will be resolved.

acezen commented 3 weeks ago

Let's just wait for #514 and the problem with private[sql] will be resolved.

514 has been merged, Hi, @amygbAI , you can rebase the main and check the private[sql] problem is resolved or not.

amygbAI commented 3 weeks ago

Hi @acezen, after rebasing, the compilation with changes for datasources-32 ( spark 3.2 ) worked perfectly ! no more trouble with JSONOptions.

But then i wanted to compile datasources-33 ( spark 3.3.4 ) and it ran into several issues , none related to the changes i made .. i went ahead and checked a few of them and found that the variables its cribbing about dont exist on the spark 3.3 branch BUT are present in 3.4 onwards .. the issue is that if i continue without making changes to datasources-33 the PR will be incomplete ; please advise

[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:141: one more argument than can be applied to method apply: (sqlConf: org.apache.spark.sql.internal.SQLConf, broadcastedConf: org.apache.spark.broadcast.Broadcast[org.apache.spark.util.SerializableConfiguration], dataSchema: org.apache.spark.sql.types.StructType, readDataSchema: org.apache.spark.sql.types.StructType, partitionSchema: org.apache.spark.sql.types.StructType, filters: Array[org.apache.spark.sql.sources.Filter])org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory in object OrcPartitionReaderFactory
Note that 'aggregation' is not a parameter name of the invoked method.
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:182: value LEGACY_PARQUET_NANOS_AS_LONG is not a member of object org.apache.spark.sql.internal.SQLConf
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:183: value legacyParquetNanosAsLong is not a member of org.apache.spark.sql.internal.SQLConf
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:186: value PARQUET_FIELD_ID_READ_ENABLED is not a member of object org.apache.spark.sql.internal.SQLConf
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:187: value parquetFieldIdReadEnabled is not a member of org.apache.spark.sql.internal.SQLConf
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:202: one more argument than can be applied to method apply: (sqlConf: org.apache.spark.sql.internal.SQLConf, broadcastedConf: org.apache.spark.broadcast.Broadcast[org.apache.spark.util.SerializableConfiguration], dataSchema: org.apache.spark.sql.types.StructType, readDataSchema: org.apache.spark.sql.types.StructType, partitionSchema: org.apache.spark.sql.types.StructType, filters: Array[org.apache.spark.sql.sources.Filter], parquetOptions: org.apache.spark.sql.execution.datasources.parquet.ParquetOptions)org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory in object ParquetPartitionReaderFactory
Note that 'aggregation' is not a parameter name of the invoked method.
[WARNING] [Warn] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala:244: parameter value sparkSession in method getFilePartitions is never used
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala:68: value pushDataFilters is not a member of org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala:79: value pushDataFilters is not a member of org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala:103: not found: value pushedDataFilters
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala:162: not found: value bucketSpec
[WARNING] [Warn] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala:107: local val pathName in method validateInputs is never used
[WARNING] [Warn] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala:97: parameter value caseSensitiveAnalysis in method validateInputs is never used
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala:114: value PARQUET_FIELD_ID_WRITE_ENABLED is not a member of object org.apache.spark.sql.internal.SQLConf
[ERROR] [Error] /datadrive/APACHE_GRAPHAR/incubator-graphar/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala:115: value parquetFieldIdWriteEnabled is not a member of org.apache.spark.sql.internal.SQLConf
SemyonSinchenko commented 2 weeks ago

@amygbAI May you push all the latest changes to the branch please?

amygbAI commented 2 weeks ago

done .. rebased using main ( post #514 ) and moved all prior changes .. compiled using spark 3.2 and 3.3 and tested

On Mon, Jun 10, 2024 at 5:43 PM Semyon @.***> wrote:

@amygbAI https://github.com/amygbAI May you push all the latest changes to the branch please?

— Reply to this email directly, view it on GitHub https://github.com/apache/incubator-graphar/pull/488#issuecomment-2158181169, or unsubscribe https://github.com/notifications/unsubscribe-auth/ATIQOSD2QGN2Q75NK34MPGDZGWJ57AVCNFSM6AAAAABIBS443WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNJYGE4DCMJWHE . You are receiving this because you were mentioned.Message ID: @.***>