airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.11k stars 4.12k forks source link

šŸ›source-railz Incremental sync fails due to NestedStateCartesianProductStreamSlicer.stream_slices() missing arguments #42416

Open zh4ngx opened 3 months ago

zh4ngx commented 3 months ago

Connector Name

source-railz

Connector Version

0.1.3

What step the error happened?

During the sync

Relevant information

When running a source-railz sync, base syncs succeed, but incremental syncs using base_incremental_service_stream fail. When looking into the logs (attached below), it looks like NestedStateCartesianProductStreamSlicer.stream_slices(sync_mode, stream_state) is the cause.

NestedStateCartesianProductStreamSlicer subclasses CartesianProductStreamSlicer, which takes in stream_slicers: List[PartitionRouter]. In the yaml file, 2 stream slicers are passed in: SubstreamPartitionRouter, which subclasses PartitionRouter, and DatetimeBasedCursor, which does not. Both classes implement the .stream_slices() method.

When incremental sync is run, it looks like it is using SimpleRetriever.stream_slicer.stream_slices() (no arguments), which fails since NestedStateCartesianProductStreamSlicer.stream_slices(...) requires 2 arguments.

2 things pop out to me:

  1. SimpleRetriever expects a consistent StreamSlicer.stream_slices() method signature
  2. NestedStateCartesianProductStreamSlicer is introducing 2 new method arguments to StreamSlicer that aren't supported elsewhere.

Relevant log output

2024-07-22 22:05:54 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=20, successiveCompleteFailures=4, totalCompleteFailures=4, successivePartialFailures=0, totalPartialFailures=0)
2024-07-22 22:05:54 platform > Backing off for: 4 minutes 30 seconds.
2024-07-22 22:10:24 platform > Docker volume job log path: /tmp/workspace/10/4/logs.log
2024-07-22 22:10:24 platform > Executing worker wrapper. Airbyte version: 0.63.8
2024-07-22 22:10:24 platform > 
2024-07-22 22:10:24 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:24 platform > ----- START CHECK -----
2024-07-22 22:10:24 platform > 
2024-07-22 22:10:24 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:24 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:24 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:24 platform > Checking if airbyte/source-railz:dev exists...
2024-07-22 22:10:24 platform > airbyte/source-railz:dev was found locally.
2024-07-22 22:10:24 platform > Creating docker container = source-railz-check-10-4-kxaid with resources io.airbyte.config.ResourceRequirements@5f78386[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts null
2024-07-22 22:10:24 platform > Preparing command: docker run --rm --init -i -w /data/10/4 --log-driver none --name source-railz-check-10-4-kxaid --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-railz:dev -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=4 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.8 -e WORKER_JOB_ID=10 airbyte/source-railz:dev check --config source_config.json
2024-07-22 22:10:24 platform > Reading messages from protocol version 0.2.0
2024-07-22 22:10:26 platform > Check succeeded
2024-07-22 22:10:27 platform > Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@701f45da[status=succeeded,message=<null>,additionalProperties={}]
2024-07-22 22:10:27 platform > 
2024-07-22 22:10:27 platform > ----- END CHECK -----
2024-07-22 22:10:27 platform > 
2024-07-22 22:10:27 platform > Docker volume job log path: /tmp/workspace/10/4/logs.log
2024-07-22 22:10:27 platform > Executing worker wrapper. Airbyte version: 0.63.8
2024-07-22 22:10:27 platform > 
2024-07-22 22:10:27 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:27 platform > ----- START CHECK -----
2024-07-22 22:10:27 platform > 
2024-07-22 22:10:27 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:27 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:27 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:27 platform > Checking if airbyte/destination-bigquery:2.8.3 exists...
2024-07-22 22:10:27 platform > airbyte/destination-bigquery:2.8.3 was found locally.
2024-07-22 22:10:27 platform > Creating docker container = destination-bigquery-check-10-4-vkphx with resources io.airbyte.config.ResourceRequirements@1df11ab8[cpuRequest=,cpuLimit=,memoryRequest=,memoryLimit=,additionalProperties={}] and allowedHosts null
2024-07-22 22:10:27 platform > Preparing command: docker run --rm --init -i -w /data/10/4 --log-driver none --name destination-bigquery-check-10-4-vkphx --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-bigquery:2.8.3 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=4 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.8 -e WORKER_JOB_ID=10 airbyte/destination-bigquery:2.8.3 check --config source_config.json
2024-07-22 22:10:27 platform > Reading messages from protocol version 0.2.0
2024-07-22 22:10:27 platform > INFO main i.a.i.d.b.BigQueryDestinationKt(main):385 Starting Destination : class io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:27 platform > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):144 integration args: {check=null, config=source_config.json}
2024-07-22 22:10:27 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):124 Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:27 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):125 Command: CHECK
2024-07-22 22:10:27 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):126 Integration config: IntegrationConfig{command=CHECK, configPath='source_config.json', catalogPath='null', statePath='null'}
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:27 platform > INFO main i.a.i.d.b.BigQueryUtils(getLoadingMethod):233 Selected loading method is set to: STANDARD
2024-07-22 22:10:30 platform > INFO main i.a.c.i.b.IntegrationRunner(runInternal):269 Completed integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:30 platform > INFO main i.a.i.d.b.BigQueryDestinationKt(main):387 Completed Destination : class io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:30 platform > Check connection job received output: io.airbyte.config.StandardCheckConnectionOutput@2d3cfd2a[status=succeeded,message=<null>,additionalProperties={}]
2024-07-22 22:10:30 platform > 
2024-07-22 22:10:30 platform > ----- END CHECK -----
2024-07-22 22:10:30 platform > 
2024-07-22 22:10:32 platform > Docker volume job log path: /tmp/workspace/10/4/logs.log
2024-07-22 22:10:32 platform > Executing worker wrapper. Airbyte version: 0.63.8
2024-07-22 22:10:32 platform > start sync worker. job id: 10 attempt id: 4
2024-07-22 22:10:32 platform > 
2024-07-22 22:10:32 platform > ----- START REPLICATION -----
2024-07-22 22:10:32 platform > 
2024-07-22 22:10:32 platform > Number of Resumed Full Refresh Streams: {0}
2024-07-22 22:10:32 platform > Running destination...
2024-07-22 22:10:32 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:32 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:32 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:32 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:32 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:32 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-22 22:10:32 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:32 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-22 22:10:32 platform > Checking if airbyte/source-railz:dev exists...
2024-07-22 22:10:32 platform > Checking if airbyte/destination-bigquery:2.8.3 exists...
2024-07-22 22:10:32 platform > airbyte/destination-bigquery:2.8.3 was found locally.
2024-07-22 22:10:32 platform > Creating docker container = destination-bigquery-write-10-4-behjt with resources io.airbyte.config.ResourceRequirements@514a87f5[cpuRequest=0.5,cpuLimit=1,memoryRequest=1Gi,memoryLimit=2Gi,additionalProperties={}] and allowedHosts null
2024-07-22 22:10:32 platform > Preparing command: docker run --rm --init -i -w /data/10/4 --log-driver none --name destination-bigquery-write-10-4-behjt --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-bigquery:2.8.3 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=4 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.8 -e WORKER_JOB_ID=10 --cpus=1 --memory-reservation=1Gi --memory=2Gi airbyte/destination-bigquery:2.8.3 write --config destination_config.json --catalog destination_catalog.json
2024-07-22 22:10:32 platform > airbyte/source-railz:dev was found locally.
2024-07-22 22:10:32 platform > Creating docker container = source-railz-read-10-4-kncuv with resources io.airbyte.config.ResourceRequirements@7b210982[cpuRequest=0.5,cpuLimit=1,memoryRequest=1Gi,memoryLimit=2Gi,additionalProperties={}] and allowedHosts null
2024-07-22 22:10:32 platform > Preparing command: docker run --rm --init -i -w /data/10/4 --log-driver none --name source-railz-read-10-4-kncuv -e CONCURRENT_SOURCE_STREAM_READ=false --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-railz:dev -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=4 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.63.8 -e WORKER_JOB_ID=10 --cpus=1 --memory-reservation=1Gi --memory=2Gi airbyte/source-railz:dev read --config source_config.json --catalog source_catalog.json
2024-07-22 22:10:32 platform > Writing messages to protocol version 0.2.0
2024-07-22 22:10:32 platform > Reading messages from protocol version 0.2.0
2024-07-22 22:10:32 platform > Reading messages from protocol version 0.2.0
2024-07-22 22:10:32 platform > readFromSource: start
2024-07-22 22:10:32 platform > Starting source heartbeat check. Will check threshold of 10800 seconds, every 1 minutes.
2024-07-22 22:10:32 platform > processMessage: start
2024-07-22 22:10:32 platform > writeToDestination: start
2024-07-22 22:10:32 platform > readFromDestination: start
2024-07-22 22:10:33 destination > INFO main i.a.i.d.b.BigQueryDestinationKt(main):385 Starting Destination : class io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:33 source > Starting syncing SourceRailz
2024-07-22 22:10:33 destination > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):144 integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2024-07-22 22:10:33 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):124 Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination
2024-07-22 22:10:33 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):125 Command: WRITE
2024-07-22 22:10:33 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):126 Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2024-07-22 22:10:33 source > Marking stream invoices as STARTED
2024-07-22 22:10:33 source > Setting state of SourceRailz stream to {}
2024-07-22 22:10:33 platform > Stream status TRACE received of status: STARTED for stream invoices
2024-07-22 22:10:33 source > Syncing stream: invoices 
2024-07-22 22:10:33 platform > Sending update for invoices - null -> RUNNING
2024-07-22 22:10:33 platform > Stream Status Update Received: invoices - RUNNING
2024-07-22 22:10:33 platform > Creating status: invoices - RUNNING
2024-07-22 22:10:33 source > Encountered an exception while reading stream invoices
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 135, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 230, in _read_stream
    for record_data_or_message in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 154, in read
    checkpoint_reader = self._get_checkpoint_reader(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 398, in _get_checkpoint_reader
    slices = self.stream_slices(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 148, in stream_slices
    return self.retriever.stream_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 378, in stream_slices
    return self.stream_slicer.stream_slices()
TypeError: stream_slices() missing 2 required positional arguments: 'sync_mode' and 'stream_state'
2024-07-22 22:10:33 source > Marking stream invoices as STOPPED
2024-07-22 22:10:33 source > Finished syncing invoices
2024-07-22 22:10:33 source > SourceRailz runtimes:
Syncing stream invoices 0:00:00.001468
2024-07-22 22:10:33 source > During the sync, the following streams did not sync successfully: invoices: AirbyteTracedException("stream_slices() missing 2 required positional arguments: 'sync_mode' and 'stream_state'")
2024-07-22 22:10:33 source > None
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 8, in <module>
    run()
  File "/airbyte/integration_code/source_railz/run.py", line 14, in run
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 235, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 122, in run
    yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 166, in read
    for message in self.source.read(self.logger, config, catalog, state):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 167, in read
    yield from super().read(logger, config, catalog, state)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 184, in read
    raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)
airbyte_cdk.utils.traced_exception.AirbyteTracedException: None
2024-07-22 22:10:33 platform > Stream status TRACE received of status: INCOMPLETE for stream invoices
2024-07-22 22:10:33 platform > Sending update for invoices - RUNNING -> INCOMPLETE
2024-07-22 22:10:33 platform > Stream Status Update Received: invoices - INCOMPLETE
2024-07-22 22:10:33 platform > Updating status: invoices - INCOMPLETE
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword groups - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword group - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword display_type - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-22 22:10:33 destination > INFO main i.a.i.d.b.BigQueryUtils(getLoadingMethod):233 Selected loading method is set to: STANDARD
2024-07-22 22:10:33 platform > readFromSource: source exception
io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1
    at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:378) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-07-22 22:10:33 platform > readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2024-07-22 22:10:33 platform > processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2024-07-22 22:10:33 platform > writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2024-07-22 22:10:33 platform > thread status... timeout thread: false , replication thread: true
2024-07-22 22:10:33 destination > INFO main i.a.i.b.d.t.CatalogParser(parseCatalog):132 Running sync with stream configs: [StreamConfig(id=StreamId(finalNamespace=quickbooks20240722, finalName=source_defined_invoices, rawNamespace=airbyte_internal, rawName=quickbooks20240722_raw__stream_source_defined_invoices, originalNamespace=quickbooks20240722, originalName=source_defined_invoices), destinationSyncMode=append_dedup, primaryKey=[ColumnId(name=id, originalName=id, canonicalName=id)], cursor=Optional[ColumnId(name=postedDate, originalName=postedDate, canonicalName=posteddate)], columns={ColumnId(name=id, originalName=id, canonicalName=id)=Union(options=[STRING, INTEGER]), ColumnId(name=memo, originalName=memo, canonicalName=memo)=STRING, ColumnId(name=lines, originalName=lines, canonicalName=lines)=Array(items=Struct(properties={quantity=INTEGER, subTotal=NUMBER, taxAmount=NUMBER, accountRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), taxRateRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, effectiveTaxRate=NUMBER}), unitAmount=NUMBER, description=STRING, locationRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), totalAmount=Union(options=[NUMBER, STRING]), inventoryRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), discountAmount=NUMBER, discountPercentage=Union(options=[NUMBER, STRING]), trackingCategoryRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, option=STRING, optionId=STRING})})), ColumnId(name=status, originalName=status, canonicalName=status)=STRING, ColumnId(name=dueDate, originalName=dueDate, canonicalName=duedate)=TIMESTAMP_WITH_TIMEZONE, ColumnId(name=payments, originalName=payments, canonicalName=payments)=Array(items=Struct(properties={date=TIMESTAMP_WITH_TIMEZONE, amount=NUMBER, paymentId=STRING})), ColumnId(name=subTotal, originalName=subTotal, canonicalName=subtotal)=NUMBER, ColumnId(name=amountDue, originalName=amountDue, canonicalName=amountdue)=NUMBER, ColumnId(name=taxAmount, originalName=taxAmount, canonicalName=taxamount)=NUMBER, ColumnId(name=postedDate, originalName=postedDate, canonicalName=posteddate)=TIMESTAMP_WITH_TIMEZONE, ColumnId(name=currencyRef, originalName=currencyRef, canonicalName=currencyref)=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, symbol=STRING}), ColumnId(name=customerRef, originalName=customerRef, canonicalName=customerref)=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), ColumnId(name=serviceName, originalName=serviceName, canonicalName=servicename)=STRING, ColumnId(name=totalAmount, originalName=totalAmount, canonicalName=totalamount)=Union(options=[NUMBER, STRING]), ColumnId(name=businessName, originalName=businessName, canonicalName=businessname)=STRING, ColumnId(name=currencyRate, originalName=currencyRate, canonicalName=currencyrate)=NUMBER, ColumnId(name=invoiceNumber, originalName=invoiceNumber, canonicalName=invoicenumber)=STRING, ColumnId(name=totalDiscount, originalName=totalDiscount, canonicalName=totaldiscount)=NUMBER, ColumnId(name=subsidiaryRefs, originalName=subsidiaryRefs, canonicalName=subsidiaryrefs)=Array(items=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING})), ColumnId(name=discountPercentage, originalName=discountPercentage, canonicalName=discountpercentage)=NUMBER, ColumnId(name=sourceModifiedDate, originalName=sourceModifiedDate, canonicalName=sourcemodifieddate)=TIMESTAMP_WITH_TIMEZONE}, generationId=0, minimumGenerationId=0, syncId=10)]
2024-07-22 22:10:33 destination > INFO main i.a.i.b.d.o.DefaultSyncOperation(createPerStreamOpClients):52 Preparing required schemas and tables for all streams
2024-07-22 22:10:36 destination > INFO main i.a.i.d.b.t.BigQueryDestinationHandler(existingSchemaMatchesStreamConfig):263 Alter Table Report [] [] []; Clustering true; Partitioning true
2024-07-22 22:10:36 destination > INFO sync-operations-1 i.a.i.b.d.t.TyperDeduperUtil(runMigrationsAsync$lambda$12):165 Maybe executing BigQueryDV2Migration migration for stream quickbooks20240722.source_defined_invoices.
2024-07-22 22:10:36 destination > INFO sync-operations-1 i.a.i.d.b.m.BigQueryDV2Migration(migrateIfNecessary):27 Initializing DV2 Migration check
2024-07-22 22:10:36 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(migrateIfNecessary):20 Assessing whether migration is necessary for stream source_defined_invoices
2024-07-22 22:10:36 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(shouldMigrate):44 Checking whether v1 raw table _airbyte_raw_source_defined_invoices in dataset quickbooks20240722 exists
2024-07-22 22:10:37 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(shouldMigrate):52 Migration Info: Required for Sync mode: true, No existing v2 raw tables: false, A v1 raw table exists: false
2024-07-22 22:10:37 destination > INFO sync-operations-1 i.a.i.b.d.t.BaseDestinationV1V2Migrator(migrateIfNecessary):31 No Migration Required for stream: source_defined_invoices
2024-07-22 22:10:37 destination > INFO main i.a.i.b.d.t.TyperDeduperUtil(executeRawTableMigrations):66 Refetching initial state for streams: [StreamId(finalNamespace=quickbooks20240722, finalName=source_defined_invoices, rawNamespace=airbyte_internal, rawName=quickbooks20240722_raw__stream_source_defined_invoices, originalNamespace=quickbooks20240722, originalName=source_defined_invoices)]
2024-07-22 22:10:40 destination > INFO main i.a.i.d.b.t.BigQueryDestinationHandler(existingSchemaMatchesStreamConfig):263 Alter Table Report [] [] []; Clustering true; Partitioning true
2024-07-22 22:10:40 destination > INFO main i.a.i.b.d.t.TyperDeduperUtil(executeRawTableMigrations):73 Updated states: [DestinationInitialStatus(streamConfig=StreamConfig(id=StreamId(finalNamespace=quickbooks20240722, finalName=source_defined_invoices, rawNamespace=airbyte_internal, rawName=quickbooks20240722_raw__stream_source_defined_invoices, originalNamespace=quickbooks20240722, originalName=source_defined_invoices), destinationSyncMode=append_dedup, primaryKey=[ColumnId(name=id, originalName=id, canonicalName=id)], cursor=Optional[ColumnId(name=postedDate, originalName=postedDate, canonicalName=posteddate)], columns={ColumnId(name=id, originalName=id, canonicalName=id)=Union(options=[STRING, INTEGER]), ColumnId(name=memo, originalName=memo, canonicalName=memo)=STRING, ColumnId(name=lines, originalName=lines, canonicalName=lines)=Array(items=Struct(properties={quantity=INTEGER, subTotal=NUMBER, taxAmount=NUMBER, accountRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), taxRateRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, effectiveTaxRate=NUMBER}), unitAmount=NUMBER, description=STRING, locationRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), totalAmount=Union(options=[NUMBER, STRING]), inventoryRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), discountAmount=NUMBER, discountPercentage=Union(options=[NUMBER, STRING]), trackingCategoryRef=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, option=STRING, optionId=STRING})})), ColumnId(name=status, originalName=status, canonicalName=status)=STRING, ColumnId(name=dueDate, originalName=dueDate, canonicalName=duedate)=TIMESTAMP_WITH_TIMEZONE, ColumnId(name=payments, originalName=payments, canonicalName=payments)=Array(items=Struct(properties={date=TIMESTAMP_WITH_TIMEZONE, amount=NUMBER, paymentId=STRING})), ColumnId(name=subTotal, originalName=subTotal, canonicalName=subtotal)=NUMBER, ColumnId(name=amountDue, originalName=amountDue, canonicalName=amountdue)=NUMBER, ColumnId(name=taxAmount, originalName=taxAmount, canonicalName=taxamount)=NUMBER, ColumnId(name=postedDate, originalName=postedDate, canonicalName=posteddate)=TIMESTAMP_WITH_TIMEZONE, ColumnId(name=currencyRef, originalName=currencyRef, canonicalName=currencyref)=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING, symbol=STRING}), ColumnId(name=customerRef, originalName=customerRef, canonicalName=customerref)=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING}), ColumnId(name=serviceName, originalName=serviceName, canonicalName=servicename)=STRING, ColumnId(name=totalAmount, originalName=totalAmount, canonicalName=totalamount)=Union(options=[NUMBER, STRING]), ColumnId(name=businessName, originalName=businessName, canonicalName=businessname)=STRING, ColumnId(name=currencyRate, originalName=currencyRate, canonicalName=currencyrate)=NUMBER, ColumnId(name=invoiceNumber, originalName=invoiceNumber, canonicalName=invoicenumber)=STRING, ColumnId(name=totalDiscount, originalName=totalDiscount, canonicalName=totaldiscount)=NUMBER, ColumnId(name=subsidiaryRefs, originalName=subsidiaryRefs, canonicalName=subsidiaryrefs)=Array(items=Struct(properties={id=Union(options=[STRING, INTEGER]), name=STRING})), ColumnId(name=discountPercentage, originalName=discountPercentage, canonicalName=discountpercentage)=NUMBER, ColumnId(name=sourceModifiedDate, originalName=sourceModifiedDate, canonicalName=sourcemodifieddate)=TIMESTAMP_WITH_TIMEZONE}, generationId=0, minimumGenerationId=0, syncId=10), isFinalTablePresent=true, initialRawTableStatus=InitialRawTableStatus(rawTableExists=true, hasUnprocessedRecords=false, maxProcessedTimestamp=Optional.empty), initialTempRawTableStatus=InitialRawTableStatus(rawTableExists=false, hasUnprocessedRecords=false, maxProcessedTimestamp=Optional.empty), isSchemaMismatch=false, isFinalTableEmpty=true, destinationState=BigQueryDestinationState(needsSoftReset=false))]
2024-07-22 22:10:40 destination > INFO sync-operations-2 i.a.i.b.d.t.TyperDeduperUtil(runMigrationsAsync$lambda$12):165 Maybe executing BigqueryAirbyteMetaAndGenerationIdMigration migration for stream quickbooks20240722.source_defined_invoices.
2024-07-22 22:10:40 destination > INFO sync-operations-2 i.a.i.d.b.m.BigqueryAirbyteMetaAndGenerationIdMigration(migrateIfNecessary):43 Skipping airbyte_meta/generation_id migration for quickbooks20240722.source_defined_invoices because the table already has the columns
2024-07-22 22:10:40 destination > INFO main i.a.i.d.b.t.BigQueryDestinationHandler(createDataset):359 Creating dataset if not present airbyte_internal
2024-07-22 22:10:41 destination > INFO main i.a.i.d.b.t.BigQueryDestinationHandler(createDataset):359 Creating dataset if not present quickbooks20240722
2024-07-22 22:10:42 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareStageForNormalSync):80 quickbooks20240722.source_defined_invoices: non-truncate sync. Creating raw table if not exists.
2024-07-22 22:10:42 destination > INFO sync-operations-3 i.a.i.d.b.BigQueryUtils(createPartitionedTableIfNotExists):131 Partitioned table ALREADY EXISTS: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=airbyte_internal, tableId=quickbooks20240722_raw__stream_source_defined_invoices}}
2024-07-22 22:10:42 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareStageForNormalSync):126 quickbooks20240722.source_defined_invoices: non-truncate sync and no temp raw table. Initial raw table status is null.
2024-07-22 22:10:42 destination > INFO sync-operations-3 i.a.i.b.d.o.AbstractStreamOperation(prepareFinalTable):188 Final Table exists for stream source_defined_invoices
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.b.BufferManager(<init>):48 Max 'memory' available for buffer allocation 742 MB
2024-07-22 22:10:42 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):424 Starting buffered read of input stream
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(start):73 Start async buffer supervisor
2024-07-22 22:10:42 destination > INFO pool-3-thread-1 i.a.c.i.d.a.b.BufferManager(printQueueInfo):94 [ASYNC QUEUE INFO] Global: max: 742.41 MB, allocated: 10 MB (10.0 MB), %% used: 0.013469714189502041 | State Manager memory usage: Allocated: 10 MB, Used: 0 bytes, percentage Used 0.0
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(start):89 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer started.
2024-07-22 22:10:42 destination > INFO pool-6-thread-1 i.a.c.i.d.a.FlushWorkers(printWorkerInfo):127 [ASYNC WORKER INFO] Pool queue size: 0, Active threads: 0
2024-07-22 22:10:42 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):446 Finished buffered read of input stream
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):193 Closing flush workers -- waiting for all buffers to flush
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):230 Closing flush workers -- all buffers flushed
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):153 Flushing states
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.s.GlobalAsyncStateManager(flushStates):207 Flushing states complete
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.GlobalMemoryManager(free):78 Freeing 0 bytes..
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):238 Closing flush workers -- supervisor shut down
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):240 Closing flush workers -- Starting worker pool shutdown..
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.FlushWorkers(close):245 Closing flush workers  -- workers shut down
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.b.BufferManager(close):73 Buffers cleared..
2024-07-22 22:10:42 destination > INFO sync-operations-4 i.a.i.d.b.o.BigQueryStorageOperation(cleanupStage):121 Nothing to cleanup in stage for Streaming inserts
2024-07-22 22:10:42 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(finalizeTable):267 Not overwriting raw table for quickbooks20240722.source_defined_invoices. Truncate sync: false; stream success: false
2024-07-22 22:10:42 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(finalizeTable):288 Skipping typing and deduping for stream quickbooks20240722.source_defined_invoices because it had no records during this sync and no unprocessed records from a previous sync.
2024-07-22 22:10:42 destination > INFO sync-operations-4 i.a.i.b.d.o.AbstractStreamOperation(finalizeTable):327 Not overwriting final table for quickbooks20240722.source_defined_invoices. Truncate sync: false; stream success: false; final table suffix not blank: false
2024-07-22 22:10:42 destination > INFO main i.a.i.b.d.o.DefaultSyncOperation(finalizeStreams):150 Cleaning up sync operation thread pools
2024-07-22 22:10:42 destination > INFO main i.a.c.i.d.a.AsyncStreamConsumer(close):195 class io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer closed
2024-07-22 22:10:42 destination > ERROR main i.a.c.i.b.IntegrationRunner(runInternal):216 caught exception! io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error: [io.airbyte.protocol.models.v0.StreamDescriptor@f6de586[name=source_defined_invoices,namespace=<null>,additionalProperties={}]]
    at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:205) ~[airbyte-cdk-core-0.41.2.jar:?]
    at kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:46) ~[kotlin-stdlib-1.9.23.jar:1.9.23-release-779]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209) [airbyte-cdk-core-0.41.2.jar:?]
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116) [airbyte-cdk-core-0.41.2.jar:?]
    at io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt.main(BigQueryDestination.kt:386) [io.airbyte.airbyte-integrations.connectors-destination-bigquery.jar:?]

Stack Trace: io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error: [io.airbyte.protocol.models.v0.StreamDescriptor@f6de586[name=source_defined_invoices,namespace=<null>,additionalProperties={}]]
    at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:205)
    at kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:46)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209)
    at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116)
    at io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt.main(BigQueryDestination.kt:386)

2024-07-22 22:10:43 platform > readFromDestination: exception caught
io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1
    at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:493) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?]
    at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:235) ~[io.airbyte-airbyte-commons-worker-0.63.8.jar:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-07-22 22:10:43 platform > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2024-07-22 22:10:43 platform > Closing StateCheckSumCountEventHandler
2024-07-22 22:10:43 platform > sync summary: {
  "status" : "failed",
  "startTime" : 1721686232135,
  "endTime" : 1721686243040,
  "totalStats" : {
    "bytesEmitted" : 0,
    "destinationStateMessagesEmitted" : 0,
    "destinationWriteEndTime" : 0,
    "destinationWriteStartTime" : 1721686232138,
    "meanSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBeforeSourceStateMessageEmitted" : 0,
    "meanSecondsBetweenStateMessageEmittedandCommitted" : 0,
    "recordsEmitted" : 0,
    "replicationEndTime" : 1721686243040,
    "replicationStartTime" : 1721686232135,
    "sourceReadEndTime" : 0,
    "sourceReadStartTime" : 1721686232139,
    "sourceStateMessagesEmitted" : 0
  },
  "streamStats" : [ ],
  "performanceMetrics" : {
    "processFromSource" : {
      "elapsedTimeInNanos" : 17760458,
      "executionCount" : 4,
      "avgExecTimeInNanos" : 4440114.5
    },
    "readFromSource" : {
      "elapsedTimeInNanos" : 1261151005,
      "executionCount" : 254,
      "avgExecTimeInNanos" : 4965161.437007874
    },
    "processFromDest" : {
      "elapsedTimeInNanos" : 21834,
      "executionCount" : 1,
      "avgExecTimeInNanos" : 21834.0
    },
    "writeToDest" : {
      "elapsedTimeInNanos" : 0,
      "executionCount" : 0,
      "avgExecTimeInNanos" : "NaN"
    },
    "readFromDest" : {
      "elapsedTimeInNanos" : 10872174978,
      "executionCount" : 875,
      "avgExecTimeInNanos" : 1.2425342832E7
    }
  }
}
2024-07-22 22:10:43 platform > failures: [ {
  "failureOrigin" : "source",
  "failureType" : "system_error",
  "internalMessage" : "stream_slices() missing 2 required positional arguments: 'sync_mode' and 'stream_state'",
  "externalMessage" : "Something went wrong in the connector. See the logs for more details.",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10,
    "from_trace_message" : true,
    "connector_command" : "read"
  },
  "stacktrace" : "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 135, in read\n    yield from self._read_stream(\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 230, in _read_stream\n    for record_data_or_message in record_iterator:\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 154, in read\n    checkpoint_reader = self._get_checkpoint_reader(\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 398, in _get_checkpoint_reader\n    slices = self.stream_slices(\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 148, in stream_slices\n    return self.retriever.stream_slices()\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 378, in stream_slices\n    return self.stream_slicer.stream_slices()\nTypeError: stream_slices() missing 2 required positional arguments: 'sync_mode' and 'stream_state'\n",
  "timestamp" : 1721686233296,
  "streamDescriptor" : {
    "name" : "invoices"
  }
}, {
  "failureOrigin" : "source",
  "failureType" : "config_error",
  "externalMessage" : "During the sync, the following streams did not sync successfully: invoices: AirbyteTracedException(\"stream_slices() missing 2 required positional arguments: 'sync_mode' and 'stream_state'\")",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10,
    "from_trace_message" : true,
    "connector_command" : "read"
  },
  "stacktrace" : "Traceback (most recent call last):\n  File \"/airbyte/integration_code/main.py\", line 8, in <module>\n    run()\n  File \"/airbyte/integration_code/source_railz/run.py\", line 14, in run\n    launch(source, sys.argv[1:])\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 235, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run\n    yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 166, in read\n    for message in self.source.read(self.logger, config, catalog, state):\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 167, in read\n    yield from super().read(logger, config, catalog, state)\n  File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 184, in read\n    raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: None\n",
  "timestamp" : 1721686233297
}, {
  "failureOrigin" : "destination",
  "failureType" : "transient_error",
  "internalMessage" : "io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error: [io.airbyte.protocol.models.v0.StreamDescriptor@f6de586[name=source_defined_invoices,namespace=<null>,additionalProperties={}]]",
  "externalMessage" : "Some streams were unsuccessful due to a source error: [io.airbyte.protocol.models.v0.StreamDescriptor@f6de586[name=source_defined_invoices,namespace=<null>,additionalProperties={}]]",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.commons.exceptions.TransientErrorException: Some streams were unsuccessful due to a source error: [io.airbyte.protocol.models.v0.StreamDescriptor@f6de586[name=source_defined_invoices,namespace=<null>,additionalProperties={}]]\n\tat io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.close(AsyncStreamConsumer.kt:205)\n\tat kotlin.jdk7.AutoCloseableKt.closeFinally(AutoCloseableJVM.kt:46)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116)\n\tat io.airbyte.integrations.destination.bigquery.BigQueryDestinationKt.main(BigQueryDestination.kt:386)\n",
  "timestamp" : 1721686242939
}, {
  "failureOrigin" : "source",
  "internalMessage" : "Source process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:378)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1721686233427
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:493)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:235)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1721686243039
}, {
  "failureOrigin" : "replication",
  "internalMessage" : "io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.",
  "externalMessage" : "Something went wrong during replication",
  "metadata" : {
    "attemptNumber" : 4,
    "jobId" : 10
  },
  "stacktrace" : "java.lang.RuntimeException: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:538)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:263)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.internal.DefaultAirbyteDestination.close(DefaultAirbyteDestination.java:178)\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:536)\n\t... 5 more\n",
  "timestamp" : 1721686243040
} ]
2024-07-22 22:10:43 platform > 
2024-07-22 22:10:43 platform > ----- END REPLICATION -----
2024-07-22 22:10:43 platform > 
2024-07-22 22:10:43 platform > Retry State: RetryManager(completeFailureBackoffPolicy=BackoffPolicy(minInterval=PT10S, maxInterval=PT30M, base=3), partialFailureBackoffPolicy=null, successiveCompleteFailureLimit=5, totalCompleteFailureLimit=10, successivePartialFailureLimit=1000, totalPartialFailureLimit=20, successiveCompleteFailures=5, totalCompleteFailures=5, successivePartialFailures=0, totalPartialFailures=0)
 Backoff before next attempt: 13 minutes 30 seconds
2024-07-22 22:10:43 platform > Failing job: 10, reason: Job failed after too many retries for connection 5f1b5108-876f-489f-99ca-8d419c343fdf

Contribute

marcosmarxm commented 3 months ago

Thanks for reporting the issueĀ @zh4ngx Railz is a community connector and it isnā€™t in the current roadmap for improvements. If you want to contribute fixing the issue please reach me out in Slack so I can provide you instructions to make the contribution.

zh4ngx commented 3 months ago

Thanks @marcosmarxm . Quick question: can NestedStateCartesianProductStreamSlicer be migrated to the low-code framework as #22873 states? Are there instructions on how to do that?

zh4ngx commented 3 months ago

I found the issue: https://github.com/airbytehq/airbyte/commit/a45a1e33413eacccb2b8a83450d1638c48fd2042

The API was updated here but stream_slices() on this source still uses the old API with stream_state and sync_mode. Will send in PR.

natikgadzhi commented 2 months ago

Adding this to our community board ā€” we'll take a look. I see you're interested, so I'll ask folks to post their journey. I think you're right that we should replace the whole custom component with a declarative approach.

zh4ngx commented 2 months ago

Much appreciated. Been struggling with getting this working and it's a serious blocker for our pipeline