Closed monai closed 2 years ago
@alafanechere do you think this https://github.com/airbytehq/airbyte/issues/8330 is also same?
we don't support $ref / $defs in airbyte json schema, so normalization does not support that either
The source should resolve and inline the full json into the catalog
we don't support $ref / $defs in airbyte json schema, so normalization does not support that either
The source should resolve and inline the full json into the catalog
Well, actually, Airbyte does support $ref
's to other files. Referenced schemas are inlined under and $ref
s point to #/definitions/{filename}
.
See the documentation: https://docs.airbyte.io/connector-development/cdk-python/schemas#static-schemas
Important note: any objects referenced via $ref should be placed in the shared/ directory in their own .json files.
Sorry for the misunderstanding!
That's the documentation page for the CDK and how to ease implementation of source connectors in airbyte.
But once the source connector produces the catalog.json in Airbyte's protocol, there should be no more $ref afterward as the CDK/source connector should have inlined all the $ref into a full JSON schema object.
The separated YAML/JSON files (used in the refs) are not transmitted through the protocol (only a full json object called catalog), so normalization would not be able to resolve and inline the different JSON object back together. (that's why I said that it is "not supported")
see this issue https://github.com/airbytehq/airbyte/issues/7966 and its associated PRs where we are adding "Source Acceptance Tests" (SAT) to make sure all $ref are resolved by the source connector before "materializing" the catalog.json (that is used downstream by normalization)
Enviroment
Current Behavior
Basic normalization doesn't extract nested objects when they're referenced using
$ref
and defined using$defs
in the same schema. The current workaround is to inline all, usually duplicated, object definitions.Example schema that uses
$ref
/$defs
: https://github.com/airbytehq/airbyte/pull/8177/commits/5303b0b848732f3c79643e465bee8d3c79808b0bExpected Behavior
I expect to deduplicate schemas and have all the benefits of basic normalization nested object extraction.
Logs
Failed nested object extraction:
LOG
``` 2021-11-22 13:40:15 INFO () WorkerRun(call):49 - Executing worker wrapper. Airbyte version: 0.32.0-alpha 2021-11-22 13:40:16 INFO () TemporalAttemptExecution(get):116 - Executing worker wrapper. Airbyte version: 0.32.0-alpha 2021-11-22 13:40:16 WARN () Databases(createPostgresDatabaseWithRetry):41 - Waiting for database to become available... 2021-11-22 13:40:16 INFO () JobsDatabaseInstance(lambda$static$2):25 - Testing if jobs database is ready... 2021-11-22 13:40:16 INFO () Databases(createPostgresDatabaseWithRetry):58 - Database available! 2021-11-22 13:40:16 WARN () JsonMetaSchema(newValidator):338 - Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword 2021-11-22 13:40:16 INFO () DefaultReplicationWorker(run):82 - start sync worker. job id: 232 attempt id: 0 2021-11-22 13:40:16 INFO () DefaultReplicationWorker(run):91 - configured sync modes: {null.stock_items=full_refresh - append, null.processed_order_details=full_refresh - append, null.processed_orders=full_refresh - append, null.stock_locations=full_refresh - append} 2021-11-22 13:40:16 INFO () DefaultAirbyteDestination(start):64 - Running destination... 2021-11-22 13:40:16 INFO () LineGobbler(voidCall):82 - Checking if airbyte/destination-postgres:0.3.12 exists... 2021-11-22 13:40:16 INFO () LineGobbler(voidCall):82 - airbyte/destination-postgres:0.3.12 not found locally. Attempting to pull the image... 2021-11-22 13:40:36 INFO () LineGobbler(voidCall):82 - Pulled airbyte/destination-postgres:0.3.12 from remote. 2021-11-22 13:40:36 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/232/0 --network host --log-driver none airbyte/destination-postgres:0.3.12 write --config destination_config.json --catalog destination_catalog.json 2021-11-22 13:40:36 INFO () LineGobbler(voidCall):82 - Checking if airbyte/source-linnworks:dev exists... 2021-11-22 13:40:36 INFO () LineGobbler(voidCall):82 - airbyte/source-linnworks:dev was found locally. 2021-11-22 13:40:36 INFO () DockerProcessFactory(create):127 - Preparing command: docker run --rm --init -i -v airbyte_workspace:/data -v /tmp/airbyte_local:/local -w /data/232/0 --network host --log-driver none airbyte/source-linnworks:dev read --config source_config.json --catalog source_catalog.json 2021-11-22 13:40:36 INFO () DefaultReplicationWorker(lambda$getDestinationOutputRunnable$3):226 - Destination output thread started. 2021-11-22 13:40:36 INFO () DefaultReplicationWorker(run):119 - Waiting for source thread to join. 2021-11-22 13:40:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):190 - Replication thread started. [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [32mINFO[m i.a.i.d.p.PostgresDestination(main):69 - {} - starting destination: class io.airbyte.integrations.destination.postgres.PostgresDestination [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [32mINFO[m i.a.i.b.IntegrationRunner(run):76 - {} - Running integration: io.airbyte.integrations.base.ssh.SshWrappedDestination [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [32mINFO[m i.a.i.b.IntegrationCliParser(parseOptions):118 - {} - integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json} [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [32mINFO[m i.a.i.b.IntegrationRunner(run):80 - {} - Command: WRITE [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [32mINFO[m i.a.i.b.IntegrationRunner(run):81 - {} - Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'} [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword [35mdestination[0m - 2021-11-22 13:40:42 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:42 [33mWARN[m c.n.s.JsonMetaSchema(newValidator):338 - {} - Unknown keyword multiline - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.b.s.SshTunnel(getInstance):170 - {} - Starting connection with method: NO_TUNNEL [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=processed_order_details, namespace=null, outputSchemaName=linnworks, tmpTableName=_airbyte_tmp_ctb_processed_order_details, outputTableName=_airbyte_raw_processed_order_details, syncMode=append} [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=processed_orders, namespace=null, outputSchemaName=linnworks, tmpTableName=_airbyte_tmp_dok_processed_orders, outputTableName=_airbyte_raw_processed_orders, syncMode=append} [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=stock_items, namespace=null, outputSchemaName=linnworks, tmpTableName=_airbyte_tmp_yan_stock_items, outputTableName=_airbyte_raw_stock_items, syncMode=append} [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$toWriteConfig$0):96 - {} - Write config: WriteConfig{streamName=stock_locations, namespace=null, outputSchemaName=linnworks, tmpTableName=_airbyte_tmp_trv_stock_locations, outputTableName=_airbyte_raw_stock_locations, syncMode=append} [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.b.BufferedStreamConsumer(startTracked):124 - {} - class io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started. [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):121 - {} - Preparing tmp tables in destination started for 4 streams [35mdestination[0m - 2021-11-22 13:40:43 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:43 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream processed_order_details. schema: linnworks, tmp table name: _airbyte_tmp_ctb_processed_order_details [35mdestination[0m - 2021-11-22 13:40:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:44 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream processed_orders. schema: linnworks, tmp table name: _airbyte_tmp_dok_processed_orders [35mdestination[0m - 2021-11-22 13:40:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:44 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream stock_items. schema: linnworks, tmp table name: _airbyte_tmp_yan_stock_items [35mdestination[0m - 2021-11-22 13:40:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:44 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):125 - {} - Preparing tmp table in destination started for stream stock_locations. schema: linnworks, tmp table name: _airbyte_tmp_trv_stock_locations [35mdestination[0m - 2021-11-22 13:40:44 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:40:44 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onStartFunction$1):131 - {} - Preparing tables in destination completed. 2021-11-22 13:40:51 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 1000 2021-11-22 13:41:03 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 2000 2021-11-22 13:41:18 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 3000 2021-11-22 13:41:30 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 4000 2021-11-22 13:41:53 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 5000 2021-11-22 13:42:08 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 6000 2021-11-22 13:42:32 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 7000 2021-11-22 13:42:57 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 8000 2021-11-22 13:43:27 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 9000 2021-11-22 13:43:49 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 10000 2021-11-22 13:44:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 11000 2021-11-22 13:45:08 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 12000 2021-11-22 13:45:45 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 13000 2021-11-22 13:46:14 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 14000 2021-11-22 13:46:26 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 15000 2021-11-22 13:46:48 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 16000 2021-11-22 13:47:10 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 17000 2021-11-22 13:47:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 18000 2021-11-22 13:47:41 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 19000 2021-11-22 13:48:01 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 20000 2021-11-22 13:48:20 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 21000 2021-11-22 13:48:36 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 22000 2021-11-22 13:48:47 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 23000 2021-11-22 13:49:04 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 24000 2021-11-22 13:49:23 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 25000 2021-11-22 13:49:47 INFO () DefaultReplicationWorker(lambda$getReplicationRunnable$2):203 - Records read: 26000 2021-11-22 13:50:06 INFO () DefaultReplicationWorker(run):121 - Source thread complete. 2021-11-22 13:50:06 INFO () DefaultReplicationWorker(run):122 - Waiting for destination thread to join. [35mdestination[0m - 2021-11-22 13:50:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:06 [32mINFO[m i.a.i.b.FailureTrackingAirbyteMessageConsumer(close):60 - {} - Airbyte message consumer: succeeded. [35mdestination[0m - 2021-11-22 13:50:06 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:06 [32mINFO[m i.a.i.d.b.BufferedStreamConsumer(close):199 - {} - executing on success close procedure. [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):160 - {} - Finalizing tables in destination started for 4 streams [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):165 - {} - Finalizing stream processed_order_details. schema linnworks, tmp table _airbyte_tmp_ctb_processed_order_details, final table _airbyte_raw_processed_order_details [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):165 - {} - Finalizing stream processed_orders. schema linnworks, tmp table _airbyte_tmp_dok_processed_orders, final table _airbyte_raw_processed_orders [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):165 - {} - Finalizing stream stock_items. schema linnworks, tmp table _airbyte_tmp_yan_stock_items, final table _airbyte_raw_stock_items [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):165 - {} - Finalizing stream stock_locations. schema linnworks, tmp table _airbyte_tmp_trv_stock_locations, final table _airbyte_raw_stock_locations [35mdestination[0m - 2021-11-22 13:50:23 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:23 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):178 - {} - Executing finalization of tables. [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):180 - {} - Finalizing tables in destination completed. [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):183 - {} - Cleaning tmp tables in destination started for 4 streams [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):187 - {} - Cleaning tmp table in destination started for stream processed_order_details. schema linnworks, tmp table name: _airbyte_tmp_ctb_processed_order_details [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):187 - {} - Cleaning tmp table in destination started for stream processed_orders. schema linnworks, tmp table name: _airbyte_tmp_dok_processed_orders [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):187 - {} - Cleaning tmp table in destination started for stream stock_items. schema linnworks, tmp table name: _airbyte_tmp_yan_stock_items [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):187 - {} - Cleaning tmp table in destination started for stream stock_locations. schema linnworks, tmp table name: _airbyte_tmp_trv_stock_locations [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.j.JdbcBufferedConsumerFactory(lambda$onCloseFunction$3):192 - {} - Cleaning tmp tables in destination completed. [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.b.IntegrationRunner(run):133 - {} - Completed integration: io.airbyte.integrations.base.ssh.SshWrappedDestination [35mdestination[0m - 2021-11-22 13:50:27 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-22 13:50:27 [32mINFO[m i.a.i.d.p.PostgresDestination(main):71 - {} - completed destination: class io.airbyte.integrations.destination.postgres.PostgresDestination 2021-11-22 13:50:28 INFO () DefaultReplicationWorker(run):124 - Destination thread complete. 2021-11-22 13:50:28 INFO () DefaultReplicationWorker(run):152 - sync summary: io.airbyte.config.ReplicationAttemptSummary@52540356[status=completed,recordsSynced=26782,bytesSynced=66088631,startTime=1637588416788,endTime=1637589028283] 2021-11-22 13:50:28 INFO () DefaultReplicationWorker(run):161 - Source did not output any state messages 2021-11-22 13:50:28 WARN () DefaultReplicationWorker(run):172 - State capture: No state retained. 2021-11-22 13:50:28 INFO () TemporalAttemptExecution(get):137 - Stopping cancellation check scheduling... 2021-11-22 13:50:28 INFO () ReplicationActivityImpl(replicate):121 - sync summary: io.airbyte.config.StandardSyncOutput@59e5da9f[standardSyncSummary=io.airbyte.config.StandardSyncSummary@15220a54[status=completed,recordsSynced=26782,bytesSynced=66088631,startTime=1637588416788,endTime=1637589028283],state=Steps to Reproduce
$ref
/$defs
, e.g. https://github.com/airbytehq/airbyte/pull/8177/commits/5303b0b848732f3c79643e465bee8d3c79808b0bAre you willing to submit a PR?
Maybe.