airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.2k stars 4.14k forks source link

[destination-databricks] Cannot sync. properties with comples/struct types #31334

Closed leo-schick closed 3 months ago

leo-schick commented 1 year ago

Connector Name

destination-databricks

Connector Version

1.1.0

What step the error happened?

During the sync

Relevant information

I tried to synchronize data from a new source, build with the Airbyte UI builder, to source Databricks Lakehouse. The stream has the follow structure for an entity:

{
    "job_id": 612849589603272,
    "creator_user_name": "lschick@****",
    "settings": {
      "name": "Vacuum tables",
      "email_notifications": {
        "on_failure": [
          "leonhard.schick@****"
        ],
        "no_alert_for_skipped_runs": false
      },
      "notification_settings": {
        "no_alert_for_skipped_runs": false,
        "no_alert_for_canceled_runs": false
      },
      "timeout_seconds": 0,
      "schedule": {
        "quartz_cron_expression": "7 0 21 ? * Sun",
        "timezone_id": "Europe/Amsterdam",
        "pause_status": "UNPAUSED"
      },
      "max_concurrent_runs": 1,
      "format": "MULTI_TASK"
    },
    "created_time": 1696581543414
  },

When I try to sync. this, the databricks destination fails with the issue, that property settings has an unsupported data type object. This sounds like, that complex data types in json are not mapped to the right data type in databricks. I would suggest to map the json type object in databricks to data type string. If that would be the case, I could easily analyze the data with the JSON functions inside SQL.

Relevant log output

2023-10-12 09:39:04 destination > INFO i.a.i.b.IntegrationCliParser(parseOptions):126 integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2023-10-12 09:39:04 destination > INFO i.a.i.b.IntegrationRunner(runInternal):106 Running integration: io.airbyte.integrations.destination.databricks.DatabricksDestination
2023-10-12 09:39:04 destination > INFO i.a.i.b.IntegrationRunner(runInternal):107 Command: WRITE
2023-10-12 09:39:04 destination > INFO i.a.i.b.IntegrationRunner(runInternal):108 Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2023-10-12 09:39:05 INFO i.a.w.g.ReplicationWorkerHelper(endOfSource):172 - Total records read: 19 (5 KB)
2023-10-12 09:39:05 INFO i.a.w.i.FieldSelector(reportMetrics):122 - Schema validation was performed to a max of 10 records with errors per stream.
2023-10-12 09:39:05 INFO i.a.w.g.BufferedReplicationWorker(readFromSource):356 - readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2023-10-12 09:39:05 INFO i.a.w.i.HeartbeatTimeoutChaperone(runWithHeartbeatThread):115 - thread status... heartbeat thread: false , replication thread: true
2023-10-12 09:39:05 INFO i.a.w.g.BufferedReplicationWorker(writeToDestination):428 - writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2023-10-12 09:39:05 destination > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-12 09:39:05 destination > WARN c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2023-10-12 09:39:05 destination > INFO i.a.i.d.j.c.SwitchingDestination(getConsumer):65 Using destination type: AZURE_BLOB_STORAGE
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.DatabricksStorageConfigProvider(getDatabricksStorageConfig):22 Databricks storage type config: "AZURE_BLOB_STORAGE"
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.DatabricksStorageConfigProvider(getDatabricksStorageConfig):22 Databricks storage type config: "AZURE_BLOB_STORAGE"
2023-10-12 09:39:05 destination > INFO c.z.h.HikariDataSource(<init>):80 HikariPool-1 - Starting...</init>
2023-10-12 09:39:05 destination > INFO c.z.h.HikariDataSource(<init>):82 HikariPool-1 - Start completed.</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.DatabricksStreamCopier(<init>):73 [Stream Workflow Jobs] Database schema: source_databricks</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(<init>):74 [Stream Workflow Jobs] Tmp table _airbyte_tmp_xds_workflow_jobs location: abfss://source-databricks@****.blob.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_xds_workflow_jobs/</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(<init>):75 [Stream Workflow Jobs] Data table workflow_jobs location: abfss://source-databricks@****.dfs.core.windows.net/source_databricks/Workflow Jobs/</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.DatabricksStreamCopier(<init>):73 [Stream Workflow Job runs] Database schema: source_databricks</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(<init>):74 [Stream Workflow Job runs] Tmp table _airbyte_tmp_axe_workflow_job_runs location: abfss://source-databricks@****.blob.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_axe_workflow_job_runs/</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(<init>):75 [Stream Workflow Job runs] Data table workflow_job_runs location: abfss://source-databricks@****.dfs.core.windows.net/source_databricks/Workflow Job runs/</init>
2023-10-12 09:39:05 destination > INFO i.a.i.d.b.BufferedStreamConsumer(startTracked):144 class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2023-10-12 09:39:05 destination > INFO i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):80 Airbyte message consumer: succeeded.
2023-10-12 09:39:05 destination > INFO i.a.i.d.b.BufferedStreamConsumer(close):255 executing on success close procedure.
2023-10-12 09:39:05 destination > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):85 Flushing Workflow Jobs: 13 records (25 KB)
2023-10-12 09:39:05 destination > INFO c.a.c.u.l.ClientLogger(info):222 Package versions: jackson-core=2.14.1, jackson-databind=2.14.1, jackson-dataformat-xml=2.14.1, jackson-datatype-jsr310=2.14.1, azure-core=1.35.0, Troubleshooting version conflicts: https://aka.ms/azsdk/java/dependency/troubleshoot
2023-10-12 09:39:07 destination > WARN c.a.c.u.l.ClientLogger(performLogging):500 'Content-Type' not found. Returning default encoding: JSON
2023-10-12 09:39:08 destination > INFO i.a.i.d.r.InMemoryRecordBufferingStrategy(flushAllBuffers):91 Flushing completed for Workflow Jobs
2023-10-12 09:39:08 INFO i.a.w.i.s.SyncPersistenceImpl(startBackgroundFlushStateTask):188 - starting state flush thread for connectionId 956d69e6-ff60-4de5-8432-cec7b853a2fa
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(closeStagingUploader):137 Uploading remaining data for Workflow Jobs stream.
2023-10-12 09:39:08 destination > INFO i.a.i.d.a.w.BaseAzureBlobStorageWriter(close):56 Uploading remaining data for stream 'Workflow Jobs'.
2023-10-12 09:39:08 destination > INFO i.a.i.d.a.c.AzureBlobStorageCsvWriter(closeWhenSucceed):60 Closing csvPrinter when succeed
2023-10-12 09:39:08 destination > INFO i.a.i.d.a.w.BaseAzureBlobStorageWriter(close):58 Upload completed for stream 'Workflow Jobs'.
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(closeStagingUploader):141 All data for Workflow Jobs stream uploaded.
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.DatabricksStreamCopier(createDestinationSchema):82 [Stream Workflow Jobs] Creating database schema if it does not exist: source_databricks
2023-10-12 09:39:08 destination > INFO c.z.h.p.PoolBase(getAndSetNetworkTimeout):536 HikariPool-1 - Driver does not support get/set network timeout for connections. ([Databricks][JDBC](10220) Driver does not support this optional feature.)
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.DatabricksStreamCopier(createTemporaryTable):88 [Stream Workflow Jobs] Creating tmp table _airbyte_tmp_xds_workflow_jobs from staging file: abfss://source-databricks@****.blob.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_xds_workflow_jobs/
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(getCreateTempTableStatement):147 Json schema for stream Workflow Jobs: {"$schema":"http://json-schema.org/schema#","type":"object","properties":{"settings":{"type":"object","properties":{"schedule":{"type":"object","properties":{"timezone_id":{"type":"string"},"pause_status":{"type":"string"},"quartz_cron_expression":{"type":"string"}}},"webhook_notifications":{"type":"object","properties":{"on_failure":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"}}}},"on_start":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"}}}},"on_success":{"type":"array","items":{"type":"object","properties":{"id":{"type":"string"}}}}}},"notification_settings":{"type":"object","properties":{"no_alert_for_skipped_runs":{"type":"boolean"},"no_alert_for_canceled_runs":{"type":"boolean"}}},"max_concurrent_runs":{"type":"number"},"name":{"type":"string"},"format":{"type":"string"},"email_notifications":{"type":"object","properties":{"on_failure":{"type":"array","items":{"type":"string"}},"no_alert_for_skipped_runs":{"type":"boolean"},"on_success":{"type":"array","items":{"type":"string"}}}},"timeout_seconds":{"type":"number"}}},"created_time":{"type":"number"},"creator_user_name":{"type":"string"},"job_id":{"type":"number"}}}
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.a.DatabricksAzureBlobStorageStreamCopier(getCreateTempTableStatement):151 [Stream Workflow Jobs] tmp table schema: _airbyte_ab_id string, _airbyte_emitted_at string, `created_time` double, `creator_user_name` string, `job_id` double, `settings` object
2023-10-12 09:39:08 destination > INFO i.a.i.d.d.DatabricksStreamCopier(createTemporaryTable):94 CREATE TABLE source_databricks._airbyte_tmp_xds_workflow_jobs (_airbyte_ab_id string, _airbyte_emitted_at string, `created_time` double, `creator_user_name` string, `job_id` double, `settings` object) USING csv LOCATION 'abfss://source-databricks@****.dfs.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_xds_workflow_jobs/' options ("header" = "true", "multiLine" = "true") ;
2023-10-12 09:39:08 destination > ERROR i.a.i.d.j.c.CopyConsumerFactory(closeAsOneTransaction):156 Failed to finalize copy to temp table due to: java.sql.SQLException: [Databricks][DatabricksJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.catalyst.parser.ParseException: 
DataType object is not supported.(line 1, pos 193)
== SQL ==
CREATE TABLE source_databricks._airbyte_tmp_xds_workflow_jobs (_airbyte_ab_id string, _airbyte_emitted_at string, `created_time` double, `creator_user_name` string, `job_id` double, `settings` object) USING csv LOCATION 'abfss:REDACTED_LOCAL_PART@****.dfs.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_xds_workflow_jobs/' options ("header" = "true", "multiLine" = "true")
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^
        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:48)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:498)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:124)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:410)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:321)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:149)
        at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:49)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:54)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:299)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:284)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:333)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.parser.ParseException: 
DataType object is not supported.(line 1, pos 193)

== SQL ==
CREATE TABLE source_databricks._airbyte_tmp_xds_workflow_jobs (_airbyte_ab_id string, _airbyte_emitted_at string, `created_time` double, `creator_user_name` string, `job_id` double, `settings` object) USING csv LOCATION 'abfss:REDACTED_LOCAL_PART@horzedlprod.dfs.core.windows.net/b673d79c-007a-499d-873b-117099d20a2e/source_databricks/_airbyte_tmp_xds_workflow_jobs/' options ("header" = "true", "multiLine" = "true")
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^

[...]

Contribute

evantahler commented 3 months ago

Closing - this destination has been updated significantly since this issue was posted.