airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.4k stars 3.97k forks source link

[destination-elasticsearch] Duplicate data on incremental append sync using Connector Builder #34570

Closed arghaffari closed 5 days ago

arghaffari commented 7 months ago

Connector Name

destination-elasticsearch

Connector Version

0.50.34

What step the error happened?

During the sync

Relevant information

I am using Airbyte's Connector Builder to fetch data from an API and write it to Elasticsearch. Despite setting the sync mode to 'incremental append', each sync results in duplicate data being written to Elasticsearch. This issue is occurring consistently.

Data Sample:

{
    "valueNum": 0.75,
    "language": "fi",
    "questionType": "Button",
    "questionId": "5fd879a1243f7eb0",
    "surveyId": "5fd879a1265043f7eb6",
    "createdAt": 1608024549,
    "createdAtDate": "2020-12-15T09:29:09.474Z",
    "_id": "5fd881e510aaf51a6425a45a",
    "question": "Millainen tunne sinulle jäi asioidessasi verkkokaupassamme?  ",
    "responseChainId": "5fd881efa90593",
    "teamId": "5fd358ef5f85",
    "teamName": "Sto",
    "touchpointId": "5fd87b2238112a6a2977c409",
    "browser": "Windows Chrome 87.0.4280.88",
    "answers": "0.75",
    "response_id": "5fd881e5425a45a",
    "load_date_time": "2024-01-26 13:03:50.984099+00:00"
  }

YAML Configuration:

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required:
      - start_date
      - api_key
    properties:
      api_key:
        type: string
        order: 1
        title: API Key
        airbyte_secret: true
      start_date:
        type: string
        order: 0
        title: Start date
        format: date-time
        pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
    additionalProperties: true
type: DeclarativeSource
check:
  type: CheckStream
  stream_names:
    - responses
streams:
  - name: responses
    type: DeclarativeStream
    retriever:
      type: SimpleRetriever
      paginator:
        type: DefaultPaginator
        page_token_option:
          type: RequestOption
          field_name: skip
          inject_into: request_parameter
        pagination_strategy:
          type: OffsetIncrement
          page_size: 200
      requester:
        path: responses
        type: HttpRequester
        url_base: https://api.feedbackly.com/v5.0.3/
        http_method: GET
        authenticator:
          type: ApiKeyAuthenticator
          api_token: '{{ config[''api_key''] }}'
          inject_into:
            type: RequestOption
            field_name: Authorization
            inject_into: header
        error_handler:
          type: CompositeErrorHandler
          error_handlers:
            - type: DefaultErrorHandler
              backoff_strategies:
                - type: ConstantBackoffStrategy
                  backoff_time_in_seconds: 10
        request_headers: {}
        request_body_json: {}
        request_parameters:
          preferredLanguage: en
      record_selector:
        type: RecordSelector
        extractor:
          type: DpathExtractor
          field_path:
            - data
    primary_key:
      - response_id
    schema_loader:
      type: InlineSchemaLoader
      schema:
        type: object
        $schema: http://json-schema.org/schema#
        properties:
          _id:
            type: string
          tags:
            type: array
            items:
              type: string
          teamId:
            type: string
          answers:
            type: string
          browser:
            type: string
          fieldId:
            type: string
          language:
            type: string
          metadata:
            type: object
            properties:
              __url:
                type: string
          question:
            type: string
          surveyId:
            type: string
          teamName:
            type: string
          valueNum:
            type: number
          createdAt:
            type: number
          fieldName:
            type: string
          questionId:
            type: string
          surveyName:
            type: string
          valueArray:
            type: array
            items:
              type: string
          response_id:
            type: string
          valueString:
            type: string
          questionType:
            type: string
          touchpointId:
            type: string
          createdAtDate:
            type: string
          load_date_time:
            type: string
          touchpointName:
            type: string
          responseChainId:
            type: string
    transformations:
      - type: AddFields
        fields:
          - path:
              - answers
            value: '"{{ record["answer"]}}"'
      - type: RemoveFields
        field_pointers:
          - - answer
      - type: AddFields
        fields:
          - path:
              - response_id
            value: '{{ record["_id"]}}'
      - type: AddFields
        fields:
          - path:
              - load_date_time
            value: '{{ now_utc() }}'
    incremental_sync:
      type: DatetimeBasedCursor
      cursor_field: createdAtDate
      start_datetime:
        type: MinMaxDatetime
        datetime: '{{ config[''start_date''] }}'
        datetime_format: '%Y-%m-%dT%H:%M:%SZ'
      datetime_format: '%Y-%m-%dT%H:%M:%S.%fZ'
      cursor_datetime_formats:
        - '%Y-%m-%dT%H:%M:%S.%fZ'
version: 0.51.41
metadata:
  autoImportSchema:
    responses: true

Expected Behavior: The expectation with the 'incremental append' sync mode is that only new or updated records since the last sync will be written to Elasticsearch, avoiding duplicates.

Actual Behavior: The connector is writing duplicate records in each sync, regardless of whether they are new or have been updated since the last sync.

Steps to Reproduce:

Set up the connector using the provided YAML configuration. Run a sync in 'incremental append' mode. Observe that duplicate data is written to Elasticsearch.

Relevant log output

"bytesEmitted" : 23584211,
      "recordsEmitted" : 28398,
      "recordsCommitted" : 0
    }
  } ],
  "performanceMetrics" : {
    "processFromSource" : {
      "elapsedTimeInNanos" : 367752493,
      "executionCount" : 28400,
      "avgExecTimeInNanos" : 12949.031443661972
    },
    "readFromSource" : {
      "elapsedTimeInNanos" : 301821288533,
      "executionCount" : 28400,
      "avgExecTimeInNanos" : 1.0627510159612676E7
    },
    "processFromDest" : {
      "elapsedTimeInNanos" : 0,
      "executionCount" : 0,
      "avgExecTimeInNanos" : "NaN"
    },
    "writeToDest" : {
      "elapsedTimeInNanos" : 380605557,
      "executionCount" : 28398,
      "avgExecTimeInNanos" : 13402.547961124023
    },
    "readFromDest" : {
      "elapsedTimeInNanos" : 302588267325,
      "executionCount" : 173,
      "avgExecTimeInNanos" : 1.7490651290462427E9
    }
  }
}
2024-01-26 16:41:38 INFO i.a.w.g.ReplicationWorkerHelper(getReplicationOutput):245 - failures: [ {
  "failureOrigin" : "source",
  "internalMessage" : "Source process read attempt failed",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 29,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process read attempt failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:380)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:243)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1589)\nCaused by: java.lang.IllegalStateException: Source process is still alive, cannot retrieve exit value.\n\tat com.google.common.base.Preconditions.checkState(Preconditions.java:502)\n\tat io.airbyte.workers.internal.DefaultAirbyteSource.getExitValue(DefaultAirbyteSource.java:126)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:366)\n\t... 5 more\n",
  "timestamp" : 1706287297862
} ]
2024-01-26 16:41:38 INFO i.a.c.i.LineGobbler(voidCall):149 - 
2024-01-26 16:41:38 INFO i.a.c.i.LineGobbler(voidCall):149 - ----- END REPLICATION -----
2024-01-26 16:41:38 INFO i.a.c.i.LineGobbler(voidCall):149 -

Contribute

marcosmarxm commented 7 months ago

Hello @arghaffari thanks for reporting the issue. Did you test the connection using other destination to validate the error is happening to ElasticSearch?

arghaffari commented 7 months ago

Hi @marcosmarxm, after then I checked it out with different destinations, and each time the problem remained, so this issue is not about Elasticsearch but about the way Airbyte handles the incremental syncs, so I think it doesn't retain the last state of the chosen attribute for the next time.

marcosmarxm commented 7 months ago

Can you check the state after a sync in the Settings tab in the Connection page? This can help understand if your custom connector is saving the state correctly

arghaffari commented 7 months ago

@marcosmarxm It's empty, do you have any opinion?

octavia-squidington-iii commented 3 weeks ago

At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 180 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte.

octavia-squidington-iii commented 5 days ago

This issue was closed because it has been inactive for 20 days since being marked as stale.