vertica / spark-connector

This component acts as a bridge between Spark and Vertica, allowing the user to either retrieve data from Vertica for processing in Spark, or store processed data from Spark into Vertica.
Apache License 2.0
20 stars 23 forks source link

Add parquet export support for complex types as of Vertica 12.0.2 #517

Closed ai-bq closed 1 year ago

ai-bq commented 1 year ago

Summary

Vertica 12.0.2 is adding full support of Parquet export for complex types. Spark Connector needed to be updated in a way that

  1. JSON Export is no longer forced when a complex type column is registered on initial scan
  2. Any user who is on a prior version of Vertica still has this JSON Export as the default case

Description

The VerticaScanBuilder runs prior to the Spark read pipe and includes a JSON check which returns true if complex columns are found in the table schema. This function remains the same with the exception of a conditional that checks the version of Vertica (>12.0.2 does not need this complex type check).

To successfully fetch the version of Vertica, it is added as a param of VerticaReadMetadata and passed along when VerticaScanBuilder fetches Vertica metadata in the JSON check.

Related Issue

Closes #499.

Additional Reviewers

@alexey-temnikov @alexr-bq @jonathanl-bq @jeremyp-bq

codecov[bot] commented 1 year ago

Codecov Report

Merging #517 (66b066f) into main (02e2dd8) will increase coverage by 0.05%. The diff coverage is 88.23%.

@@            Coverage Diff             @@
##             main     #517      +/-   ##
==========================================
+ Coverage   87.52%   87.58%   +0.05%     
==========================================
  Files          44       44              
  Lines        1996     2005       +9     
  Branches      124      122       -2     
==========================================
+ Hits         1747     1756       +9     
  Misses        249      249              
Flag Coverage Δ
unittests 87.58% <88.23%> (+0.05%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../vertica/spark/datasource/core/DSConfigSetup.scala 88.07% <0.00%> (ø)
...ce/core/VerticaDistributedFilesystemReadPipe.scala 89.75% <100.00%> (ø)
...ertica/spark/datasource/json/VerticaJsonScan.scala 91.66% <100.00%> (ø)
.../spark/datasource/v2/VerticaDatasourceV2Read.scala 86.51% <100.00%> (+0.47%) :arrow_up:
...a/com/vertica/spark/util/error/ErrorHandling.scala 79.39% <100.00%> (+0.08%) :arrow_up:
...rtica/spark/util/version/VerticaVersionUtils.scala 98.03% <100.00%> (+0.21%) :arrow_up:

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

ai-bq commented 1 year ago

The changes I'm seeing mostly relate to detecting version and using the right file format. There were no further changes necessary for using parquet for complex types?

The connector typically defaults to Parquet unless JSON is specified. This happens either if 1. the user passes the json param set to true through the config or 2. our JSON check scans our schema for complex columns.

The solution was to fetch the version and apply 2. only when the version is less than 12.0.2.

  private def useJson(cfg: ReadConfig): Boolean = {
    cfg match {
      case config: DistributedFilesystemReadConfig =>
        (readConfigSetup.getTableMeta(config), config.getRequiredSchema) match {
          case (Right(metadata), requiredSchema) =>
            val schema: StructType = if (requiredSchema.nonEmpty) {
              requiredSchema
            } else {
              metadata.schema
            }
            if(config.useJson) { true }
            else if(metadata.version < VerticaVersionUtils.VERTICA_12_0_2) {
              ctTools.filterComplexTypeColumns(schema).nonEmpty
            }
            else false
          case (Left(err), _) => ErrorHandling.logAndThrowError(logger, err)
        }
      case _=> false
    }
  }