GoogleCloudDataproc / spark-bigquery-connector

BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Apache License 2.0
375 stars 196 forks source link

Writing to BigQuery JSON column using Scala Spark is not working #882

Closed sarath-mec closed 1 year ago

sarath-mec commented 1 year ago

Hi,

According to Readme -> Datatypes -> JSON

Spark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions are REQUIRED:

From the help that I got from #880 I am using the following code to write to JSON column in BigQuery

I believe I am using the latest JAR

spark-bigquery-with-dependencies_2.12-0.28.0.jar

My Scala Version is

scala.tools.nsc.Properties.versionString
res15: String = version 2.12.14

Note: The following data is a sample ElasticSearch Demo Data

import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}

val conf = new SparkConf().setAppName("Test").setMaster("local[*]")
//Setting the Configuration Parameters
conf.set("temporaryGcsBucket", "temp-bucket")
conf.set("spark.datasource.bigquery.intermediateFormat", "avro")

// Spark Context from conf
val sc = new SparkContext(conf)
// Spark Session
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val RDD = sc.parallelize(
  Seq(
      ("OPCNpoQB_z8XYSv3AfQC", """{"category":["Men's Clothing"],"currency":"EUR","customer_first_name":"Eddie","customer_full_name":"Eddie Underwood","customer_gender":"MALE","customer_id":38,"customer_last_name":"Underwood","customer_phone":"","day_of_week":"Monday","day_of_week_i":0,"email":"eddie@underwood-family.zzz","manufacturer":["Elitelligence","Oceanavigations"],"order_date":"2022-12-05T09:28:48+00:00","order_id":584677,"products":[{"base_price":11.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":6283,"category":"Men's Clothing","sku":"ZO0549605496","taxless_price":11.99,"unit_discount_amount":0,"min_price":6.35,"_id":"sold_product_584677_6283","discount_amount":0,"created_on":"2016-12-26T09:28:48+00:00","product_name":"Basic T-shirt - dark blue/white","price":11.99,"taxful_price":11.99,"base_unit_price":11.99},{"base_price":24.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":19400,"category":"Men's Clothing","sku":"ZO0299602996","taxless_price":24.99,"unit_discount_amount":0,"min_price":11.75,"_id":"sold_product_584677_19400","discount_amount":0,"created_on":"2016-12-26T09:28:48+00:00","product_name":"Sweatshirt - grey multicolor","price":24.99,"taxful_price":24.99,"base_unit_price":24.99}],"sku":["ZO0549605496","ZO0299602996"],"taxful_total_price":36.98,"taxless_total_price":36.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"eddie","geoip":{"country_iso_code":"EG","location":{"lon":31.3,"lat":30.1},"region_name":"Cairo Governorate","continent_name":"Africa","city_name":"Cairo"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OPCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[0]}}"""),
      ("OfCNpoQB_z8XYSv3AfQC", """{"category":["Women's Clothing"],"currency":"EUR","customer_first_name":"Mary","customer_full_name":"Mary Bailey","customer_gender":"FEMALE","customer_id":20,"customer_last_name":"Bailey","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"mary@bailey-family.zzz","manufacturer":["Champion Arts","Pyramidustries"],"order_date":"2022-12-04T21:59:02+00:00","order_id":584021,"products":[{"base_price":24.99,"discount_percentage":0,"quantity":1,"manufacturer":"Champion Arts","tax_amount":0,"product_id":11238,"category":"Women's Clothing","sku":"ZO0489604896","taxless_price":24.99,"unit_discount_amount":0,"min_price":11.75,"_id":"sold_product_584021_11238","discount_amount":0,"created_on":"2016-12-25T21:59:02+00:00","product_name":"Denim dress - black denim","price":24.99,"taxful_price":24.99,"base_unit_price":24.99},{"base_price":28.99,"discount_percentage":0,"quantity":1,"manufacturer":"Pyramidustries","tax_amount":0,"product_id":20149,"category":"Women's Clothing","sku":"ZO0185501855","taxless_price":28.99,"unit_discount_amount":0,"min_price":15.65,"_id":"sold_product_584021_20149","discount_amount":0,"created_on":"2016-12-25T21:59:02+00:00","product_name":"Shorts - black","price":28.99,"taxful_price":28.99,"base_unit_price":28.99}],"sku":["ZO0489604896","ZO0185501855"],"taxful_total_price":53.98,"taxless_total_price":53.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"mary","geoip":{"country_iso_code":"AE","location":{"lon":55.3,"lat":25.3},"region_name":"Dubai","continent_name":"Asia","city_name":"Dubai"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OfCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[1]}}"""),
      ("OvCNpoQB_z8XYSv3AfQC", """{"category":["Women's Shoes","Women's Clothing"],"currency":"EUR","customer_first_name":"Gwen","customer_full_name":"Gwen Butler","customer_gender":"FEMALE","customer_id":26,"customer_last_name":"Butler","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"gwen@butler-family.zzz","manufacturer":["Low Tide Media","Oceanavigations"],"order_date":"2022-12-04T22:32:10+00:00","order_id":584058,"products":[{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Low Tide Media","tax_amount":0,"product_id":22794,"category":"Women's Shoes","sku":"ZO0374603746","taxless_price":99.99,"unit_discount_amount":0,"min_price":46.01,"_id":"sold_product_584058_22794","discount_amount":0,"created_on":"2016-12-25T22:32:10+00:00","product_name":"Boots - Midnight Blue","price":99.99,"taxful_price":99.99,"base_unit_price":99.99},{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":23386,"category":"Women's Clothing","sku":"ZO0272202722","taxless_price":99.99,"unit_discount_amount":0,"min_price":53.99,"_id":"sold_product_584058_23386","discount_amount":0,"created_on":"2016-12-25T22:32:10+00:00","product_name":"Short coat - white/black","price":99.99,"taxful_price":99.99,"base_unit_price":99.99}],"sku":["ZO0374603746","ZO0272202722"],"taxful_total_price":199.98,"taxless_total_price":199.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"gwen","geoip":{"country_iso_code":"US","location":{"lon":-118.2,"lat":34.1},"region_name":"California","continent_name":"North America","city_name":"Los Angeles"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OvCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[2]}}"""),
      ("O_CNpoQB_z8XYSv3AfQC", """{"category":["Women's Shoes","Women's Clothing"],"currency":"EUR","customer_first_name":"Diane","customer_full_name":"Diane Chandler","customer_gender":"FEMALE","customer_id":22,"customer_last_name":"Chandler","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"diane@chandler-family.zzz","manufacturer":["Primemaster","Oceanavigations"],"order_date":"2022-12-04T22:58:05+00:00","order_id":584093,"products":[{"base_price":74.99,"discount_percentage":0,"quantity":1,"manufacturer":"Primemaster","tax_amount":0,"product_id":12304,"category":"Women's Shoes","sku":"ZO0360303603","taxless_price":74.99,"unit_discount_amount":0,"min_price":34.5,"_id":"sold_product_584093_12304","discount_amount":0,"created_on":"2016-12-25T22:58:05+00:00","product_name":"High heeled sandals - argento","price":74.99,"taxful_price":74.99,"base_unit_price":74.99},{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":19587,"category":"Women's Clothing","sku":"ZO0272002720","taxless_price":99.99,"unit_discount_amount":0,"min_price":47.01,"_id":"sold_product_584093_19587","discount_amount":0,"created_on":"2016-12-25T22:58:05+00:00","product_name":"Classic coat - black","price":99.99,"taxful_price":99.99,"base_unit_price":99.99}],"sku":["ZO0360303603","ZO0272002720"],"taxful_total_price":174.98,"taxless_total_price":174.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"diane","geoip":{"country_iso_code":"GB","location":{"lon":-0.1,"lat":51.5},"continent_name":"Europe"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"O_CNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[3]}}"""),
      ("PPCNpoQB_z8XYSv3AfQC", """{"category":["Men's Clothing","Men's Accessories"],"currency":"EUR","customer_first_name":"Eddie","customer_full_name":"Eddie Weber","customer_gender":"MALE","customer_id":38,"customer_last_name":"Weber","customer_phone":"","day_of_week":"Monday","day_of_week_i":0,"email":"eddie@weber-family.zzz","manufacturer":["Elitelligence"],"order_date":"2022-11-28T03:48:58+00:00","order_id":574916,"products":[{"base_price":59.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":11262,"category":"Men's Clothing","sku":"ZO0542505425","taxless_price":59.99,"unit_discount_amount":0,"min_price":28.2,"_id":"sold_product_574916_11262","discount_amount":0,"created_on":"2016-12-19T03:48:58+00:00","product_name":"Winter jacket - black","price":59.99,"taxful_price":59.99,"base_unit_price":59.99},{"base_price":20.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":15713,"category":"Men's Accessories","sku":"ZO0601306013","taxless_price":20.99,"unit_discount_amount":0,"min_price":10.7,"_id":"sold_product_574916_15713","discount_amount":0,"created_on":"2016-12-19T03:48:58+00:00","product_name":"Watch - green","price":20.99,"taxful_price":20.99,"base_unit_price":20.99}],"sku":["ZO0542505425","ZO0601306013"],"taxful_total_price":80.98,"taxless_total_price":80.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"eddie","geoip":{"country_iso_code":"EG","location":{"lon":31.3,"lat":30.1},"region_name":"Cairo Governorate","continent_name":"Africa","city_name":"Cairo"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"PPCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[4]}}""")
  )
)

val rowRDD = RDD.map(t => Row(t._1, t._2))

val table_schema = new StructType()
  .add(StructField("id", StringType, false))
  .add(StructField("source", StringType, true, Metadata.fromJson("""{"sqlType":"JSON"}""")))

val df = spark.createDataFrame(rowRDD, table_schema)

df.write.format("bigquery").option("temporaryGcsBucket", "temp-bucket").option("writeMethod", "indirect").option("intermediateFormat", "avro").option("table","DS.scala_created").save()

Overwrite Mode

The created table has the following structure

CREATE TABLE `DS.scala_created` ( id STRING NOT NULL, source STRING );

My Spark Context has following info at runtime

sc.getConf.getAll.foreach(println)

(spark.eventLog.enabled,true)
(spark.dynamicAllocation.minExecutors,1)
(spark.app.name,Test)
(spark.ui.proxyBase,/proxy/application_XXXXX)
(spark.dataproc.sql.joinConditionReorder.enabled,true)
(spark.driver.memory,1g)
(spark.es.query,?q=*:*)
(spark.dataproc.sql.local.rank.pushdown.enabled,true)
(spark.webui.yarn.useProxy,false)
(spark.yarn.unmanagedAM.enabled,true)
(spark.master,local[*])
(spark.es.nodes.wan.only,true)
(spark.sql.warehouse.dir,file:/var/lib/zeppelin/spark-warehouse)
(spark.metrics.namespace,app_name:${spark.app.name}.app_id:${spark.app.id})
(spark.dataproc.sql.optimizer.leftsemijoin.conversion.enabled,true)
(spark.es.resource,kibana_sample_data_ecommerce)
(spark.executor.id,driver)
(spark.hadoop.hive.execution.engine,mr)
(spark.app.id,local-XXXX)
(spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version,2)
(spark.dynamicAllocation.maxExecutors,10000)
(spark.jars.packages,org.apache.spark:spark-avro_2.12:3.1.3)
(spark.sql.catalogImplementation,hive)
(spark.app.startTime,1674313386693)
(spark.es.port,9200)
(spark.repl.local.jars,file:///var/lib/zeppelin/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.1.3.jar,file:///var/lib/zeppelin/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar)
(spark.yarn.historyServer.address,cluster-XXX-m:18080)
(spark.executorEnv.OPENBLAS_NUM_THREADS,1)
(spark.sql.cbo.enabled,true)
(spark.sql.autoBroadcastJoinThreshold,46m)
(spark.dataproc.sql.parquet.enableFooterCache,true)
(spark.driver.maxResultSize,2048m)
(spark.es.read.metadata,true)
(spark.yarn.am.memory,640m)
(spark.yarn.dist.archives,file:/usr/lib/spark/R/lib/sparkr.zip#sparkr)
(spark.executor.instances,2)
(spark.history.fs.logDirectory,gs://XXXXX/spark-job-history)
(spark.submit.deployMode,client)
(spark.extraListeners,com.google.cloud.spark.performance.DataprocMetricsListener)
(spark.driver.cores,1)
(spark.sql.cbo.joinReorder.enabled,true)
(spark.eventLog.dir,gs://XXXXX)
(spark.shuffle.service.enabled,true)
(spark.jars,file:/usr/lib/zeppelin/interpreter/spark/spark-interpreter-0.9.1-SNAPSHOT.jar)
(spark.app.initial.jar.urls,spark://XXXXXX.internal:40757/jars/spark-interpreter-0.9.1-SNAPSHOT.jar)
(spark.datasource.bigquery.intermediateFormat,avro)
(spark.scheduler.mode,FAIR)
(spark.sql.adaptive.enabled,true)
(spark.yarn.jars,local:/usr/lib/spark/jars/*)
(spark.scheduler.minRegisteredResourcesRatio,0.0)
(spark.driver.port,40757)
(spark.driver.extraJavaOptions, -Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///etc/zeppelin/conf/log4j.properties -Dlog4j.configurationFile=file:///etc/zeppelin/conf/log4j2.properties -Dzeppelin.log.file=/var/log/zeppelin/zeppelin-interpreter-spark-shared_process-zeppelin-cluster-XXXX-m.log)
(spark.ui.port,0)
(spark.es.nodes,XXXXX)
(spark.rpc.message.maxSize,512)
(spark.executor.memory,6157m)
(spark.driver.host,XXXXX.internal)
(spark.submit.pyFiles,)
(spark.dynamicAllocation.enabled,true)
(spark.driver.extraClassPath,:/usr/lib/zeppelin/local-repo/spark/*:/usr/lib/zeppelin/interpreter/spark/*:::/usr/lib/zeppelin/interpreter/zeppelin-interpreter-shaded-0.9.1-SNAPSHOT.jar:/usr/lib/zeppelin/interpreter/spark/spark-interpreter-0.9.1-SNAPSHOT.jar:/etc/hadoop/conf)
(spark.yarn.isPython,true)
(spark.executor.cores,1)
(temporaryGcsBucket,temp-bucket)
(spark.yarn.dist.jars,file:///var/lib/zeppelin/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.1.3.jar,file:///var/lib/zeppelin/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar)
FINISHED   
Took 0 sec. Last updated by anonymous at January 21 2023, 9:18:28 AM.

Append Mode

Then I tried to append to a table with JSON Column with the following DDL

CREATE TABLE `DS.append_table` ( id STRING NOT NULL, source JSON );

using the following code

df.write.format("bigquery").option("temporaryGcsBucket", "temp-bucket").option("writeMethod", "indirect").option("intermediateFormat", "avro").mode("append").option("table","DS.append_table").save()

It is giving an error as below

Field source has changed type from JSON to STRING

com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:110)
  at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
  at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
  at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:106)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 56 elided
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Provided Schema does not match Table XXXXX:DS.append_table. Field source has changed type from JSON to STRING
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:419)
  at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:252)
  at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:333)
  at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:323)
  at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:553)
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:130)
  at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:107)
  ... 80 more
sarath-mec commented 1 year ago

Related to https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/844

sarath-mec commented 1 year ago

@davidrabinowitz Seems this is working fine if both are JSON columns

val RDD = sc.parallelize(
  Seq(
      ("""["OPCNpoQB_z8XYSv3AfQC"]""", """{"category":["Men's Clothing"],"currency":"EUR","customer_first_name":"Eddie","customer_full_name":"Eddie Underwood","customer_gender":"MALE","customer_id":38,"customer_last_name":"Underwood","customer_phone":"","day_of_week":"Monday","day_of_week_i":0,"email":"eddie@underwood-family.zzz","manufacturer":["Elitelligence","Oceanavigations"],"order_date":"2022-12-05T09:28:48+00:00","order_id":584677,"products":[{"base_price":11.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":6283,"category":"Men's Clothing","sku":"ZO0549605496","taxless_price":11.99,"unit_discount_amount":0,"min_price":6.35,"_id":"sold_product_584677_6283","discount_amount":0,"created_on":"2016-12-26T09:28:48+00:00","product_name":"Basic T-shirt - dark blue/white","price":11.99,"taxful_price":11.99,"base_unit_price":11.99},{"base_price":24.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":19400,"category":"Men's Clothing","sku":"ZO0299602996","taxless_price":24.99,"unit_discount_amount":0,"min_price":11.75,"_id":"sold_product_584677_19400","discount_amount":0,"created_on":"2016-12-26T09:28:48+00:00","product_name":"Sweatshirt - grey multicolor","price":24.99,"taxful_price":24.99,"base_unit_price":24.99}],"sku":["ZO0549605496","ZO0299602996"],"taxful_total_price":36.98,"taxless_total_price":36.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"eddie","geoip":{"country_iso_code":"EG","location":{"lon":31.3,"lat":30.1},"region_name":"Cairo Governorate","continent_name":"Africa","city_name":"Cairo"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OPCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[0]}}"""),
      ("""["OfCNpoQB_z8XYSv3AfQC"]""", """{"category":["Women's Clothing"],"currency":"EUR","customer_first_name":"Mary","customer_full_name":"Mary Bailey","customer_gender":"FEMALE","customer_id":20,"customer_last_name":"Bailey","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"mary@bailey-family.zzz","manufacturer":["Champion Arts","Pyramidustries"],"order_date":"2022-12-04T21:59:02+00:00","order_id":584021,"products":[{"base_price":24.99,"discount_percentage":0,"quantity":1,"manufacturer":"Champion Arts","tax_amount":0,"product_id":11238,"category":"Women's Clothing","sku":"ZO0489604896","taxless_price":24.99,"unit_discount_amount":0,"min_price":11.75,"_id":"sold_product_584021_11238","discount_amount":0,"created_on":"2016-12-25T21:59:02+00:00","product_name":"Denim dress - black denim","price":24.99,"taxful_price":24.99,"base_unit_price":24.99},{"base_price":28.99,"discount_percentage":0,"quantity":1,"manufacturer":"Pyramidustries","tax_amount":0,"product_id":20149,"category":"Women's Clothing","sku":"ZO0185501855","taxless_price":28.99,"unit_discount_amount":0,"min_price":15.65,"_id":"sold_product_584021_20149","discount_amount":0,"created_on":"2016-12-25T21:59:02+00:00","product_name":"Shorts - black","price":28.99,"taxful_price":28.99,"base_unit_price":28.99}],"sku":["ZO0489604896","ZO0185501855"],"taxful_total_price":53.98,"taxless_total_price":53.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"mary","geoip":{"country_iso_code":"AE","location":{"lon":55.3,"lat":25.3},"region_name":"Dubai","continent_name":"Asia","city_name":"Dubai"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OfCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[1]}}"""),
      ("""[""OvCNpoQB_z8XYSv3AfQC"]""", """{"category":["Women's Shoes","Women's Clothing"],"currency":"EUR","customer_first_name":"Gwen","customer_full_name":"Gwen Butler","customer_gender":"FEMALE","customer_id":26,"customer_last_name":"Butler","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"gwen@butler-family.zzz","manufacturer":["Low Tide Media","Oceanavigations"],"order_date":"2022-12-04T22:32:10+00:00","order_id":584058,"products":[{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Low Tide Media","tax_amount":0,"product_id":22794,"category":"Women's Shoes","sku":"ZO0374603746","taxless_price":99.99,"unit_discount_amount":0,"min_price":46.01,"_id":"sold_product_584058_22794","discount_amount":0,"created_on":"2016-12-25T22:32:10+00:00","product_name":"Boots - Midnight Blue","price":99.99,"taxful_price":99.99,"base_unit_price":99.99},{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":23386,"category":"Women's Clothing","sku":"ZO0272202722","taxless_price":99.99,"unit_discount_amount":0,"min_price":53.99,"_id":"sold_product_584058_23386","discount_amount":0,"created_on":"2016-12-25T22:32:10+00:00","product_name":"Short coat - white/black","price":99.99,"taxful_price":99.99,"base_unit_price":99.99}],"sku":["ZO0374603746","ZO0272202722"],"taxful_total_price":199.98,"taxless_total_price":199.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"gwen","geoip":{"country_iso_code":"US","location":{"lon":-118.2,"lat":34.1},"region_name":"California","continent_name":"North America","city_name":"Los Angeles"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"OvCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[2]}}"""),
      ("""["O_CNpoQB_z8XYSv3AfQC"]""", """{"category":["Women's Shoes","Women's Clothing"],"currency":"EUR","customer_first_name":"Diane","customer_full_name":"Diane Chandler","customer_gender":"FEMALE","customer_id":22,"customer_last_name":"Chandler","customer_phone":"","day_of_week":"Sunday","day_of_week_i":6,"email":"diane@chandler-family.zzz","manufacturer":["Primemaster","Oceanavigations"],"order_date":"2022-12-04T22:58:05+00:00","order_id":584093,"products":[{"base_price":74.99,"discount_percentage":0,"quantity":1,"manufacturer":"Primemaster","tax_amount":0,"product_id":12304,"category":"Women's Shoes","sku":"ZO0360303603","taxless_price":74.99,"unit_discount_amount":0,"min_price":34.5,"_id":"sold_product_584093_12304","discount_amount":0,"created_on":"2016-12-25T22:58:05+00:00","product_name":"High heeled sandals - argento","price":74.99,"taxful_price":74.99,"base_unit_price":74.99},{"base_price":99.99,"discount_percentage":0,"quantity":1,"manufacturer":"Oceanavigations","tax_amount":0,"product_id":19587,"category":"Women's Clothing","sku":"ZO0272002720","taxless_price":99.99,"unit_discount_amount":0,"min_price":47.01,"_id":"sold_product_584093_19587","discount_amount":0,"created_on":"2016-12-25T22:58:05+00:00","product_name":"Classic coat - black","price":99.99,"taxful_price":99.99,"base_unit_price":99.99}],"sku":["ZO0360303603","ZO0272002720"],"taxful_total_price":174.98,"taxless_total_price":174.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"diane","geoip":{"country_iso_code":"GB","location":{"lon":-0.1,"lat":51.5},"continent_name":"Europe"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"O_CNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[3]}}"""),
      ("""["PPCNpoQB_z8XYSv3AfQC"]""", """{"category":["Men's Clothing","Men's Accessories"],"currency":"EUR","customer_first_name":"Eddie","customer_full_name":"Eddie Weber","customer_gender":"MALE","customer_id":38,"customer_last_name":"Weber","customer_phone":"","day_of_week":"Monday","day_of_week_i":0,"email":"eddie@weber-family.zzz","manufacturer":["Elitelligence"],"order_date":"2022-11-28T03:48:58+00:00","order_id":574916,"products":[{"base_price":59.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":11262,"category":"Men's Clothing","sku":"ZO0542505425","taxless_price":59.99,"unit_discount_amount":0,"min_price":28.2,"_id":"sold_product_574916_11262","discount_amount":0,"created_on":"2016-12-19T03:48:58+00:00","product_name":"Winter jacket - black","price":59.99,"taxful_price":59.99,"base_unit_price":59.99},{"base_price":20.99,"discount_percentage":0,"quantity":1,"manufacturer":"Elitelligence","tax_amount":0,"product_id":15713,"category":"Men's Accessories","sku":"ZO0601306013","taxless_price":20.99,"unit_discount_amount":0,"min_price":10.7,"_id":"sold_product_574916_15713","discount_amount":0,"created_on":"2016-12-19T03:48:58+00:00","product_name":"Watch - green","price":20.99,"taxful_price":20.99,"base_unit_price":20.99}],"sku":["ZO0542505425","ZO0601306013"],"taxful_total_price":80.98,"taxless_total_price":80.98,"total_quantity":2,"total_unique_products":2,"type":"order","user":"eddie","geoip":{"country_iso_code":"EG","location":{"lon":31.3,"lat":30.1},"region_name":"Cairo Governorate","continent_name":"Africa","city_name":"Cairo"},"event":{"dataset":"sample_ecommerce"},"_metadata":{"_index":"kibana_sample_data_ecommerce","_type":"_doc","_id":"PPCNpoQB_z8XYSv3AfQC","_score":1.0,"sort":[4]}}""")
  )
)

And this line

val table_schema = new StructType()
  .add(StructField("id", StringType, false, Metadata.fromJson("""{"sqlType":"JSON"}""")))
  .add(StructField("source", StringType, true, Metadata.fromJson("""{"sqlType":"JSON"}""")))