airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.94k stars 4.09k forks source link

Destination Databricks: Datatypes missing in SQL CREATE-statement #21193

Closed MoritzLampert closed 1 year ago

MoritzLampert commented 1 year ago

Environment

Current Behavior

The sync job fails with the error:

== SQL ==
CREATE TABLE public._airbyte_tmp_zpw_output (_airbyte_ab_id string, _airbyte_emitted_at string, `ActualElapsedTime` , `AirTime` , `Airline` , `ArrDel15` , `ArrDelay` , `ArrDelayMinutes` , `ArrTime` , `ArrTimeBlk` , `ArrivalDelayGroups` , `CRSArrTime` , `CRSDepTime` , `CRSElapsedTime` , `Cancelled` , `DOT_ID_Marketing_Airline` , `DOT_ID_Operating_Airline` , `DayOfWeek` , `DayofMonth` , `DepDel15` , `DepDelay` , `DepDelayMinutes` , `DepTime` , `DepTimeBlk` , `DepartureDelayGroups` , `Dest` , `DestAirportID` , `DestAirportSeqID` , `DestCityMarketID` , `DestCityName` , `DestState` , `DestStateFips` , `DestStateName` , `DestWac` , `Distance` , `DistanceGroup` , `DivAirportLandings` , `Diverted` , `FlightDate` , `Flight_Number_Marketing_Airline` , `Flight_Number_Operating_Airline` , `IATA_Code_Marketing_Airline` , `IATA_Code_Operating_Airline` , `Marketing_Airline_Network` , `Month` , `Operated_or_Branded_Code_Share_Partners` , `Operating_Airline` , `Origin` , `OriginAirportID` , `OriginAirportSeqID` , `OriginCityMarketID` , `OriginCityName` , `OriginState` , `OriginStateFips` , `OriginStateName` , `OriginWac` , `Quarter` , `Tail_Number` , `TaxiIn` , `TaxiOut` , `WheelsOff` , `WheelsOn` , `Year` , `_ab_additional_properties` object, `_ab_source_file_last_modified` string, `_ab_source_file_url` string) USING csv LOCATION 'abfss:REDACTED_LOCAL_PART@dbtsparkstorageaccount.dfs.core.windows.net/71270d3d-06af-4f7d-bb70-8dad60c0660d/public/_airbyte_tmp_zpw_output/' options ("header" = "true", "multiLine" = "true")
--------------------------------------------------------------------------------------------------------------------^^^

    at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:48)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:498)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.EmptyHandle$.runWith(UCSHandle.scala:103)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:410)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:321)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:149)
    at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:49)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:54)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:299)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:284)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:333)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near ',': extra input ','(line 1, pos 116)

Expected Behavior

The sync job should succeed.

Logs

airbyte_log_s3_to_databricks.txt

Steps to Reproduce

  1. Set up S3 as Source (I am using parts of a dataset from Kaggle for testing purposes: flight_data
  2. Set up Databricks as Destination
  3. Try to sync

Are you willing to submit a PR?

Not sure

Additional Information

I think the problem is that the schema returned from the S3 source consists of arrays as datatypes (see airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py line 183) but databricks expects strings (see airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksAzureBlobStorageStreamCopier.java line 164) and thus no datatype is inserted into the SQL-statement which results in the given syntax error. The source schema output from the logs:

2023-01-10 09:58:55 destination > Json schema for stream output: {"type":"object","properties":{"Dest":{"type":["null","string"]},"Year":{"type":["null","integer"]},"Month":{"type":["null","integer"]},"Origin":{"type":["null","string"]},"TaxiIn":{"type":["null","number"]},"AirTime":{"type":["null","number"]},"Airline":{"type":["null","string"]},"ArrTime":{"type":["null","number"]},"DepTime":{"type":["null","number"]},"DestWac":{"type":["null","integer"]},"Quarter":{"type":["null","integer"]},"TaxiOut":{"type":["null","number"]},"ArrDel15":{"type":["null","number"]},"ArrDelay":{"type":["null","number"]},"DepDel15":{"type":["null","number"]},"DepDelay":{"type":["null","number"]},"Distance":{"type":["null","number"]},"Diverted":{"type":["null","boolean"]},"WheelsOn":{"type":["null","number"]},"Cancelled":{"type":["null","boolean"]},"DayOfWeek":{"type":["null","integer"]},"DestState":{"type":["null","string"]},"OriginWac":{"type":["null","integer"]},"WheelsOff":{"type":["null","number"]},"ArrTimeBlk":{"type":["null","string"]},"CRSArrTime":{"type":["null","integer"]},"CRSDepTime":{"type":["null","integer"]},"DayofMonth":{"type":["null","integer"]},"DepTimeBlk":{"type":["null","string"]},"FlightDate":{"type":["null","string"]},"OriginState":{"type":["null","string"]},"Tail_Number":{"type":["null","string"]},"DestCityName":{"type":["null","string"]},"DestAirportID":{"type":["null","integer"]},"DestStateFips":{"type":["null","integer"]},"DestStateName":{"type":["null","string"]},"DistanceGroup":{"type":["null","integer"]},"CRSElapsedTime":{"type":["null","number"]},"OriginCityName":{"type":["null","string"]},"ArrDelayMinutes":{"type":["null","number"]},"DepDelayMinutes":{"type":["null","number"]},"OriginAirportID":{"type":["null","integer"]},"OriginStateFips":{"type":["null","integer"]},"OriginStateName":{"type":["null","string"]},"DestAirportSeqID":{"type":["null","integer"]},"DestCityMarketID":{"type":["null","integer"]},"ActualElapsedTime":{"type":["null","number"]},"Operating_Airline":{"type":["null","string"]},"ArrivalDelayGroups":{"type":["null","number"]},"DivAirportLandings":{"type":["null","number"]},"OriginAirportSeqID":{"type":["null","integer"]},"OriginCityMarketID":{"type":["null","integer"]},"_ab_source_file_url":{"type":"string"},"DepartureDelayGroups":{"type":["null","number"]},"DOT_ID_Marketing_Airline":{"type":["null","integer"]},"DOT_ID_Operating_Airline":{"type":["null","integer"]},"Marketing_Airline_Network":{"type":["null","string"]},"_ab_additional_properties":{"type":"object"},"IATA_Code_Marketing_Airline":{"type":["null","string"]},"IATA_Code_Operating_Airline":{"type":["null","string"]},"_ab_source_file_last_modified":{"type":"string","format":"date-time"},"Flight_Number_Marketing_Airline":{"type":["null","integer"]},"Flight_Number_Operating_Airline":{"type":["null","integer"]},"Operated_or_Branded_Code_Share_Partners":{"type":["null","string"]}}}
MoritzLampert commented 1 year ago

I just noticed, the same error occurs using the RKI Covid source with the stream states_history_deaths.

StevenReitsma commented 1 year ago

Can confirm, I've also noticed this. It only occurs when using the Azure Blob Storage backend for the Databricks connector, not for S3.

One other problem that is sort of related to this is when a source schema contains the object data type. This is an invalid data type for Databricks but is very prevalent in a lot of Airbyte sources. These should actually be flattened into separate columns (I think Databricks even offers functionality for this). Or, if this is not possible, convert it into a json dump and insert it with the string data type and normalize it later on.

IgSaf commented 1 year ago

I have created a PR on the Azure part of this issue. https://github.com/airbytehq/airbyte/pull/21238

evantahler commented 1 year ago

Closing because https://github.com/airbytehq/airbyte/pull/21238 is closed