airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.27k stars 4.15k forks source link

[source-mailchimp] "lists" is missing data #41988

Open aiman-al-masoud opened 4 months ago

aiman-al-masoud commented 4 months ago

Connector Name

source-mailchimp

Connector Version

2.0.5

What step the error happened?

No visible error

Relevant information

In short, the stream called "lists" is missing data. I've tried using the official Mailchimp API directly as in the code below, and it fetches more records than the ones downloaded by Airbyte.

import mailchimp_marketing as MailchimpMarketing

client = MailchimpMarketing.Client()
client.set_config({
  "api_key": "MY API KEY...",
})

response = client.lists.get_all_lists()
response = response['lists']
print(len(response))

I am not sure if this is a bug or if I'm just making wrong assumptions about the workings of the connector. Also, I've tried multiple versions of the connector (2.0.11, 2.0.5, 2.0.3, 1.1.0) and they all behave the same.

There are no errors in the logs (taken during a run with version 2.0.5). At line 65 the logs say: "Total records read: 7 (2 KB)"; the number of commited records is only 2, as seen at the end of the logs. The actual amount of records in my case should be 10, according to the code above (the official Mailchimp python API).

I've already tried deleting the data on the destination, deleting the connection and syncing everything anew. I am using "Full refresh | Overwrite" for the lists stream.

Thank you in advance for any kind suggestions.

Relevant log output

2024-07-16 13:35:59 platform > Docker volume job log path: /tmp/workspace/48/0/logs.log
2024-07-16 13:35:59 platform > Executing worker wrapper. Airbyte version: 0.60.1
2024-07-16 13:35:59 platform > Attempt 0 to save workflow id for cancellation
2024-07-16 13:35:59 platform > start sync worker. job id: 48 attempt id: 0
2024-07-16 13:35:59 platform > 
2024-07-16 13:35:59 platform > ----- START REPLICATION -----
2024-07-16 13:35:59 platform > 
2024-07-16 13:35:59 platform > Running destination...
2024-07-16 13:35:59 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-16 13:35:59 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-16 13:35:59 platform > Using default value for environment variable SIDECAR_KUBE_CPU_LIMIT: '2.0'
2024-07-16 13:35:59 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-16 13:35:59 platform > Using default value for environment variable SOCAT_KUBE_CPU_LIMIT: '2.0'
2024-07-16 13:35:59 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-16 13:35:59 platform > Using default value for environment variable SIDECAR_KUBE_CPU_REQUEST: '0.1'
2024-07-16 13:35:59 platform > Using default value for environment variable SOCAT_KUBE_CPU_REQUEST: '0.1'
2024-07-16 13:35:59 platform > Checking if airbyte/source-mailchimp:2.0.5 exists...
2024-07-16 13:35:59 platform > Checking if airbyte/destination-s3:0.6.2 exists...
2024-07-16 13:35:59 platform > airbyte/destination-s3:0.6.2 was found locally.
2024-07-16 13:35:59 platform > Creating docker container = destination-s3-write-48-0-goigx with resources io.airbyte.config.ResourceRequirements@718d203c[cpuRequest=1,cpuLimit=1,memoryRequest=1g,memoryLimit=1g,additionalProperties={}] and allowedHosts null
2024-07-16 13:35:59 platform > Preparing command: docker run --rm --init -i -w /data/48/0 --log-driver none --name destination-s3-write-48-0-goigx --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/destination-s3:0.6.2 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.60.1 -e WORKER_JOB_ID=48 --cpus=1 --memory-reservation=1g --memory=1g airbyte/destination-s3:0.6.2 write --config destination_config.json --catalog destination_catalog.json
2024-07-16 13:35:59 platform > airbyte/source-mailchimp:2.0.5 was found locally.
2024-07-16 13:35:59 platform > Creating docker container = source-mailchimp-read-48-0-zgvjc with resources io.airbyte.config.ResourceRequirements@59c6564b[cpuRequest=1,cpuLimit=1,memoryRequest=1g,memoryLimit=1g,additionalProperties={}] and allowedHosts io.airbyte.config.AllowedHosts@3f3131f9[hosts=[*.api.mailchimp.com, login.mailchimp.com, *.datadoghq.com, *.datadoghq.eu, *.sentry.io],additionalProperties={}]
2024-07-16 13:35:59 platform > Preparing command: docker run --rm --init -i -w /data/48/0 --log-driver none --name source-mailchimp-read-48-0-zgvjc -e CONCURRENT_SOURCE_STREAM_READ=false --network host -v airbyte_workspace:/data -v oss_local_root:/local -e DEPLOYMENT_MODE=OSS -e WORKER_CONNECTOR_IMAGE=airbyte/source-mailchimp:2.0.5 -e AUTO_DETECT_SCHEMA=true -e LAUNCHDARKLY_KEY= -e SOCAT_KUBE_CPU_REQUEST=0.1 -e SOCAT_KUBE_CPU_LIMIT=2.0 -e FIELD_SELECTION_WORKSPACES= -e USE_STREAM_CAPABLE_STATE=true -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE=dev -e APPLY_FIELD_SELECTION=false -e WORKER_JOB_ATTEMPT=0 -e OTEL_COLLECTOR_ENDPOINT=http://host.docker.internal:4317 -e FEATURE_FLAG_CLIENT=config -e AIRBYTE_VERSION=0.60.1 -e WORKER_JOB_ID=48 --cpus=1 --memory-reservation=1g --memory=1g airbyte/source-mailchimp:2.0.5 read --config source_config.json --catalog source_catalog.json --state input_state.json
2024-07-16 13:35:59 platform > Writing messages to protocol version 0.2.0
2024-07-16 13:35:59 platform > Reading messages from protocol version 0.2.0
2024-07-16 13:35:59 platform > Reading messages from protocol version 0.2.0
2024-07-16 13:35:59 platform > readFromSource: start
2024-07-16 13:35:59 platform > Starting source heartbeat check. Will check threshold of 10800 seconds, every 1 minutes.
2024-07-16 13:35:59 platform > processMessage: start
2024-07-16 13:35:59 platform > writeToDestination: start
2024-07-16 13:35:59 platform > readFromDestination: start
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.a.AdaptiveDestinationRunner$Runner(getDestination):55 Running destination under deployment mode: OSS
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.a.AdaptiveDestinationRunner$Runner(run):68 Starting destination: io.airbyte.integrations.destination.s3.S3Destination
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.IntegrationCliParser$Companion(parseOptions):145 integration args: {catalog=destination_catalog.json, write=null, config=destination_config.json}
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):124 Running integration: io.airbyte.integrations.destination.s3.S3Destination
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):125 Command: WRITE
2024-07-16 13:36:02 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):126 Integration config: IntegrationConfig{command=WRITE, configPath='destination_config.json', catalogPath='destination_catalog.json', statePath='null'}
2024-07-16 13:36:03 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword airbyte_secret - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-16 13:36:03 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword always_show - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-16 13:36:03 destination > WARN main c.n.s.JsonMetaSchema(newValidator):278 Unknown keyword order - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2024-07-16 13:36:03 destination > INFO main i.a.c.i.d.s.UploadFormatConfigFactory(getUploadFormatConfig):20 File upload format config: {"format_type":"Parquet","page_size_kb":1024,"block_size_mb":128,"compression_codec":"UNCOMPRESSED","dictionary_encoding":true,"max_padding_size_mb":8,"dictionary_page_size_kb":1024}
2024-07-16 13:36:03 destination > INFO main i.a.c.i.d.s.S3DestinationConfig(createS3Client):112 Creating S3 client...
2024-07-16 13:36:03 source > Starting syncing SourceMailchimp
2024-07-16 13:36:03 platform > Attempt 0 to get source
2024-07-16 13:36:03 platform > Attempt 0 to update source
2024-07-16 13:36:03 platform > Persisted updated configuration for source b2ed316e-570e-4797-ae88-f5910e8c0fc3. New config hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.
2024-07-16 13:36:04 source > Marking stream lists as STARTED
2024-07-16 13:36:04 source > Setting state of SourceMailchimp stream to {}
2024-07-16 13:36:04 source > Syncing stream: lists 
2024-07-16 13:36:04 destination > INFO main i.a.c.i.d.s.SerializedBufferFactory$Companion(getCreateFunction):51 S3 format config: UploadParquetFormatConfig{compressionCodec=UNCOMPRESSED, blockSize=134217728, maxPaddingSize=8388608, pageSize=1048576, dictionaryPageSize=1048576, dictionaryEncoding=true, }
2024-07-16 13:36:04 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory$Companion(toWriteConfig$lambda$0):205 Write config: WriteConfig{streamName=lists, namespace=null, outputBucketPath=main, pathFormat=main/${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_, fullOutputPath=main/lists/2024_07_16_1721136963326_, syncMode=overwrite}
2024-07-16 13:36:04 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):423 Starting buffered read of input stream
2024-07-16 13:36:04 destination > INFO main i.a.c.i.d.b.BufferedStreamConsumer(startTracked):138 class io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer started.
2024-07-16 13:36:04 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory(onStartFunction$lambda$1):57 Preparing bucket in destination started for 1 streams
2024-07-16 13:36:04 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory(onStartFunction$lambda$1):64 Clearing storage area in destination started for namespace null stream lists bucketObject main pathFormat main/${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_
2024-07-16 13:36:05 source > Marking stream lists as RUNNING
2024-07-16 13:36:05 platform > Source state message checksum is valid for stream _lists.
2024-07-16 13:36:05 source > Read 2 records from lists stream
2024-07-16 13:36:05 source > Marking stream lists as STOPPED
2024-07-16 13:36:05 source > Finished syncing lists
2024-07-16 13:36:05 source > SourceMailchimp runtimes:
Syncing stream lists 0:00:01.005800
2024-07-16 13:36:05 source > Finished syncing SourceMailchimp
2024-07-16 13:36:05 platform > Total records read: 7 (2 KB)
2024-07-16 13:36:05 platform > Schema validation was performed to a max of 10 records with errors per stream.
2024-07-16 13:36:05 platform > readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2024-07-16 13:36:05 platform > thread status... heartbeat thread: false , replication thread: true
2024-07-16 13:36:05 platform > processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2024-07-16 13:36:05 platform > writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2024-07-16 13:36:05 platform > thread status... timeout thread: false , replication thread: true
2024-07-16 13:36:05 destination > INFO main i.a.c.i.d.s.S3StorageOperations(cleanUpObjects):396 Deleting objects main/lists/2024_07_16_1721135580197_0.parquet
2024-07-16 13:36:05 destination > INFO main i.a.c.i.d.s.S3StorageOperations(cleanUpBucketObject):328 Storage bucket main has been cleaned-up (1 objects matching main/lists/[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]+_.* were deleted)...
2024-07-16 13:36:05 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory(onStartFunction$lambda$1):77 Clearing storage area in destination completed for namespace null stream lists bucketObject main
2024-07-16 13:36:05 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory(onStartFunction$lambda$1):85 Preparing storage area in destination completed.
2024-07-16 13:36:05 destination > INFO main i.a.c.i.d.r.SerializedBufferingStrategy$getOrCreateBuffer$1(invoke):93 Starting a new buffer for stream lists (current state: 0 bytes in 0 buffers)
2024-07-16 13:36:06 destination > WARN main o.a.h.u.NativeCodeLoader(<clinit>):60 Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-07-16 13:36:06 destination > INFO main i.a.c.i.b.IntegrationRunner$Companion(consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core):445 Finished buffered read of input stream
2024-07-16 13:36:06 destination > INFO main i.a.c.i.b.FailureTrackingAirbyteMessageConsumer(close):74 Airbyte message consumer: succeeded.
2024-07-16 13:36:06 destination > INFO main i.a.c.i.d.b.BufferedStreamConsumer(close):262 executing on success close procedure.
2024-07-16 13:36:06 destination > INFO main i.a.c.i.d.r.SerializedBufferingStrategy(flushAllBuffers):126 Flushing all 1 current buffers (1 KB in total)
2024-07-16 13:36:06 destination > INFO main i.a.c.i.d.r.SerializedBufferingStrategy(flushAllBuffers):132 Flushing buffer of stream lists (1 KB)
2024-07-16 13:36:06 destination > INFO main i.a.c.i.d.s.S3ConsumerFactory(flushBufferFunction$lambda$5):107 Flushing buffer for stream lists (1 KB) to storage
2024-07-16 13:36:07 destination > INFO main i.a.c.i.d.s.p.ParquetSerializedBuffer(flush):137 Finished writing data to df0653ec-8bf2-4a86-83b3-4a2cbdde92c412949829383334143014.parquet (1 KB)
2024-07-16 13:36:07 destination > INFO main a.m.s.StreamTransferManager(getMultiPartOutputStreams):329 Initiated multipart upload to immobilsarda-mailchimp/main/lists/2024_07_16_1721136963326_0.parquet with full ID 0yTVTTk7rLvO3Gp9ynTvWXjl9GKvKwUj0K7T0MdhJy5U7v40OfiFW7VznEv7TsRrXIxtfuhDs1.Dh9z7RAQpx_DxNQ92JoPVgQsCtII2HnjWDmbKQNS_p.ABfdCoJSqy
2024-07-16 13:36:07 destination > INFO main a.m.s.MultiPartOutputStream(close):158 Called close() on [MultipartOutputStream for parts 1 - 10000]
2024-07-16 13:36:07 destination > INFO main a.m.s.StreamTransferManager(complete):367 [Manager uploading to immobilsarda-mailchimp/main/lists/2024_07_16_1721136963326_0.parquet with id 0yTVTTk7r...BfdCoJSqy]: Uploading leftover stream [Part number 1 containing 0.02 MB]
2024-07-16 13:36:07 destination > INFO main a.m.s.StreamTransferManager(uploadStreamPart):560 [Manager uploading to immobilsarda-mailchimp/main/lists/2024_07_16_1721136963326_0.parquet with id 0yTVTTk7r...BfdCoJSqy]: Finished uploading [Part number 1 containing 0.02 MB]
2024-07-16 13:36:07 destination > INFO main a.m.s.StreamTransferManager(complete):397 [Manager uploading to immobilsarda-mailchimp/main/lists/2024_07_16_1721136963326_0.parquet with id 0yTVTTk7r...BfdCoJSqy]: Completed
2024-07-16 13:36:07 destination > INFO main i.a.c.i.d.s.S3StorageOperations(loadDataIntoBucket):234 Uploaded buffer file to storage: df0653ec-8bf2-4a86-83b3-4a2cbdde92c412949829383334143014.parquet -> main/lists/2024_07_16_1721136963326_0.parquet (filename: 2024_07_16_1721136963326_0.parquet)
2024-07-16 13:36:07 destination > INFO main i.a.c.i.d.s.S3StorageOperations(uploadRecordsToBucket):130 Successfully loaded records to stage main/lists/2024_07_16_1721136963326_ with 0 re-attempt(s)
2024-07-16 13:36:07 destination > INFO main i.a.c.i.d.r.SerializedBufferingStrategy(flushAllBuffers):138 Flushing completed for lists
2024-07-16 13:36:07 destination > INFO main i.a.c.i.d.r.SerializedBufferingStrategy(close):156 Closing buffer for stream lists
2024-07-16 13:36:07 platform > starting state flush thread for connectionId bb9588ee-9eb9-4439-82df-81b59276b24c
2024-07-16 13:36:07 destination > INFO main i.a.c.i.b.IntegrationRunner(runInternal):267 Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2024-07-16 13:36:07 destination > INFO main i.a.c.i.b.a.AdaptiveDestinationRunner$Runner(run):70 Completed destination: io.airbyte.integrations.destination.s3.S3Destination
2024-07-16 13:36:07 platform > readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2024-07-16 13:36:07 platform > thread status... timeout thread: false , replication thread: true
2024-07-16 13:36:07 platform > sync summary: {
  "status" : "completed",
  "recordsSynced" : 2,
  "bytesSynced" : 2631,
  "startTime" : 1721136959360,
  "endTime" : 1721136967728,
  "totalStats" : {
    "bytesCommitted" : 2631,
    "bytesEmitted" : 2631,
    "destinationStateMessagesEmitted" : 1,
    "destinationWriteEndTime" : 1721136967727,
    "destinationWriteStartTime" : 1721136959363,
    "meanSecondsBeforeSourceStateMessageEmitted" : 0,
    "maxSecondsBeforeSourceStateMessageEmitted" : 1,
    "maxSecondsBetweenStateMessageEmittedandCommitted" : 2,
    "meanSecondsBetweenStateMessageEmittedandCommitted" : 2,
    "recordsEmitted" : 2,
    "recordsCommitted" : 2,
    "replicationEndTime" : 1721136967728,
    "replicationStartTime" : 1721136959360,
    "sourceReadEndTime" : 1721136965439,
    "sourceReadStartTime" : 1721136959364,
    "sourceStateMessagesEmitted" : 1
  },
  "streamStats" : [ {
    "streamName" : "lists",
    "stats" : {
      "bytesCommitted" : 2631,
      "bytesEmitted" : 2631,
      "recordsEmitted" : 2,
      "recordsCommitted" : 2
    }
  } ],
  "performanceMetrics" : {
    "processFromSource" : {
      "elapsedTimeInNanos" : 137283209,
      "executionCount" : 7,
      "avgExecTimeInNanos" : 1.9611887E7
    },
    "readFromSource" : {
      "elapsedTimeInNanos" : 5981739222,
      "executionCount" : 1422,
      "avgExecTimeInNanos" : 4206567.666666667
    },
    "processFromDest" : {
      "elapsedTimeInNanos" : 334288,
      "executionCount" : 1,
      "avgExecTimeInNanos" : 334288.0
    },
    "writeToDest" : {
      "elapsedTimeInNanos" : 87128,
      "executionCount" : 3,
      "avgExecTimeInNanos" : 29042.666666666668
    },
    "readFromDest" : {
      "elapsedTimeInNanos" : 8218262643,
      "executionCount" : 115,
      "avgExecTimeInNanos" : 7.14631534173913E7
    }
  }
}
2024-07-16 13:36:07 platform > failures: [ ]
2024-07-16 13:36:07 platform > 
2024-07-16 13:36:07 platform > ----- END REPLICATION -----
2024-07-16 13:36:07 platform >

Contribute

marcosmarxm commented 4 months ago

Thanks for reporting the issue @aiman-al-masoud I added it to the connector team backlog.

aiman-al-masoud commented 2 months ago

Hi there, any news on this?

marcosmarxm commented 2 months ago

@aiman-al-masoud there isn't an ETA to solve the issue. You're welcome to contribute doing the fix :)