airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.98k stars 3.85k forks source link

[connector-builder] Substream Only Fetches Records from the First Page of Parent Stream #40734

Closed avirajsingh7 closed 2 weeks ago

avirajsingh7 commented 3 weeks ago

Topic

Connector-Builder-Ui

Relevant information

We're encountering an issue with source connectors configured through the UI where substreams are only making API calls to records fetched from the first page of their parent streams. This means that even if the parent stream has a significant number of records (e.g., 23k in this case), the substream will only process parent_id from first_page record of parent_stream (e.g., 546 records).

I have verified the records on DB and last record of substream is from last record of parent stream fetched from first api call.

Here's manifest.yaml file, I have configured this to get record for each api_call in substream(For debugging purpose).

marcosmarxm commented 3 weeks ago

@airbytehq/dev-marketplace-contributions can someone take a look in this issue?

Stockotaco commented 3 weeks ago

I have the same issue. My substream runs for every item in the parent stream but only the first page of the parent stream.

natikgadzhi commented 3 weeks ago

Mentioned on Community Slack: https://airbytehq-team.slack.com/archives/C021JANJ6TY/p1720192514494299

We're looking, but no ETA yet.

natikgadzhi commented 3 weeks ago

@Stockotaco @avirajsingh7 I have a hunch — does it matter if the substream OR the parent stream are full refresh or incremental? If incremental is possible and you switch to incremental, does the problem go away?

Stockotaco commented 3 weeks ago

In my case both parent and sub-streams are full refresh syncs, not incremental.

Incremental won't work in my case.

avirajsingh7 commented 3 weeks ago

@natikgadzhi both are full refresh, #40573 is also a same issue, looks like bug

natikgadzhi commented 3 weeks ago

Oh, I think we have a fix for this here: https://github.com/airbytehq/airbyte/pull/40671

natikgadzhi commented 3 weeks ago

@ChristoGrab, can you please test Builder locally with CDK with this patch applied, and confirm this works for Builder as well? If yes, also review and approve @brianjlai's pull request, and let's make sure to ship it on Monday.

killthekitten commented 1 week ago

@natikgadzhi @brianjlai this doesn't seem to be fixed in 0.64.8.

There's a pagination limit and a record filter to help with the issue, but some of the data randomly doesn't get synced. I assume the filter is applied after the pagination?

request_parameters:
  count: '100'
  active: 'true'
  organization: '{{ config["organization_url"] }}'
...
record_filter:
    type: RecordFilter
    condition: '{{ record["profile"]["type"] == "Team" }}'

Also, I have a feeling that the child stream always gets synced first. Not sure how that is possible without syncing the parent stream.

Here is the full connector yaml ```yaml version: 2.0.0 type: DeclarativeSource check: type: CheckStream stream_names: - users definitions: streams: users: type: DeclarativeStream name: users primary_key: - uri retriever: type: SimpleRetriever requester: $ref: '#/definitions/base_requester' path: /organization_memberships http_method: GET request_parameters: organization: '{{ config[''organization_url''] }}' record_selector: type: RecordSelector extractor: type: DpathExtractor field_path: - collection - '*' - user paginator: type: DefaultPaginator page_token_option: type: RequestOption inject_into: request_parameter field_name: page_token pagination_strategy: type: CursorPagination cursor_value: '{{ response.get("pagination", {}).get("next_page_token", {}) }}' stop_condition: >- {{ not response.get("pagination", {}).get("next_page_token", {}) }} schema_loader: type: InlineSchemaLoader schema: $ref: '#/schemas/users' event_types: type: DeclarativeStream name: event_types primary_key: - uri retriever: type: SimpleRetriever requester: $ref: '#/definitions/base_requester' path: /event_types http_method: GET request_parameters: count: '100' active: 'true' organization: '{{ config["organization_url"] }}' record_selector: type: RecordSelector extractor: type: DpathExtractor field_path: - collection record_filter: type: RecordFilter condition: '{{ record["profile"]["type"] == "Team" }}' paginator: type: DefaultPaginator page_token_option: type: RequestOption inject_into: request_parameter field_name: page_token pagination_strategy: type: CursorPagination cursor_value: '{{ response.get("pagination", {}).get("next_page_token", {}) }}' stop_condition: >- {{ not response.get("pagination", {}).get("next_page_token", {}) }} schema_loader: type: InlineSchemaLoader schema: $ref: '#/schemas/event_types' available_times_by_event_type: type: DeclarativeStream name: available_times_by_event_type primary_key: - scheduling_url - event_type_uri retriever: type: SimpleRetriever requester: $ref: '#/definitions/base_requester' path: /event_type_available_times http_method: GET request_body_json: end_time: '{{ day_delta(6, ''%Y-%m-%d'') }}T24:00:00Z' event_type: '{{ stream_partition.event_type_uri }}' start_time: >- {{ (now_utc() + duration('PT1M')).strftime('%Y-%m-%dT%H:%M:%SZ') }} record_selector: type: RecordSelector extractor: type: DpathExtractor field_path: - collection partition_router: type: SubstreamPartitionRouter parent_stream_configs: - type: ParentStreamConfig parent_key: uri partition_field: event_type_uri stream: $ref: '#/definitions/streams/event_types' transformations: - type: AddFields fields: - path: - event_type_uri value: '{{ stream_partition.event_type_uri }}' schema_loader: type: InlineSchemaLoader schema: $ref: '#/schemas/available_times_by_event_type' base_requester: type: HttpRequester url_base: https://api.calendly.com authenticator: type: BearerAuthenticator api_token: '{{ config["api_key"] }}' streams: - $ref: '#/definitions/streams/users' - $ref: '#/definitions/streams/event_types' - $ref: '#/definitions/streams/available_times_by_event_type' spec: type: Spec connection_specification: type: object $schema: http://json-schema.org/draft-07/schema# required: - api_key - organization_url properties: api_key: type: string order: 0 title: API Key airbyte_secret: true organization_url: type: string order: 1 title: Organization URL additionalProperties: true metadata: autoImportSchema: users: true event_types: true available_times_by_event_type: true schemas: users: type: object $schema: http://json-schema.org/schema# additionalProperties: true properties: avatar_url: type: - string - 'null' created_at: type: - string - 'null' email: type: - string - 'null' name: type: - string - 'null' scheduling_url: type: - string - 'null' slug: type: - string - 'null' timezone: type: - string - 'null' updated_at: type: - string - 'null' uri: type: string required: - uri event_types: type: object $schema: http://json-schema.org/schema# additionalProperties: true properties: type: type: - string - 'null' active: type: - boolean - 'null' admin_managed: type: - boolean - 'null' booking_method: type: - string - 'null' color: type: - string - 'null' created_at: type: - string - 'null' custom_questions: type: - array - 'null' items: type: - object - 'null' properties: type: type: - string - 'null' answer_choices: type: - array - 'null' items: type: - string - 'null' enabled: type: - boolean - 'null' include_other: type: - boolean - 'null' name: type: - string - 'null' position: type: - number - 'null' required: type: - boolean - 'null' description_html: type: - string - 'null' description_plain: type: - string - 'null' duration: type: - number - 'null' internal_note: type: - string - 'null' kind: type: - string - 'null' name: type: - string - 'null' pooling_type: type: - string - 'null' position: type: - number - 'null' profile: type: - object - 'null' properties: type: type: - string - 'null' name: type: - string - 'null' owner: type: - string - 'null' scheduling_url: type: - string - 'null' secret: type: - boolean - 'null' slug: type: - string - 'null' updated_at: type: - string - 'null' uri: type: string required: - uri available_times_by_event_type: type: object $schema: http://json-schema.org/schema# additionalProperties: true properties: event_type_uri: type: string invitees_remaining: type: - number - 'null' scheduling_url: type: string start_time: type: - string - 'null' status: type: - string - 'null' required: - scheduling_url - event_type_uri ```
natikgadzhi commented 1 week ago

Try again — released fresh Cloud Builder update yesterday AFAIK.

killthekitten commented 5 days ago

@natikgadzhi thanks a bunch, it works correctly now!

natikgadzhi commented 5 days ago

Phew! You’re very welcome ;-)

Nazaniiin commented 2 days ago

I'm still seeing this issue with Airbyte version 0.63.9. I have four sub-streams that all pull data from the same parent stream. During synchronization, the sub-streams only retrieve records from the first page of the parent stream. However, when syncing the parent stream directly, it successfully fetches records from all pages. It seems that the sub-streams are limited to retrieving only the first page's worth of data.

I'm not sure if I am missing something in my configurations. Here is my manifest.yml:

version: 3.8.2

type: DeclarativeSource

check:
  type: CheckStream
  stream_names:
    - end_users

definitions:
  streams:
    end_users:
      type: DeclarativeStream
      name: end_users
      primary_key:
        - end_user_id
      retriever:
        type: SimpleRetriever
        requester:
          $ref: '#/definitions/base_requester'
          path: api/end_users
          http_method: GET
          request_parameters:
            limit: '100'
            order_by: last_updated_desc
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                max_retries: 10
                backoff_strategies:
                  - type: ExponentialBackoffStrategy
                    factor: 2
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path:
              - end_users
        paginator:
          type: DefaultPaginator
          page_token_option:
            type: RequestOption
            inject_into: request_parameter
            field_name: page
          pagination_strategy:
            type: PageIncrement
            start_from_page: 1
      transformations:
        - type: RemoveFields
          field_pointers:
            - - profit_and_loss_layout
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: '#/schemas/end_users'
    profit_and_loss:
      type: DeclarativeStream
      name: profit_and_loss
      primary_key:
        - end_user_id
      retriever:
        type: SimpleRetriever
        requester:
          $ref: '#/definitions/base_requester'
          path: >-
            api/end_users/{{ stream_partition.end_user_id_or_heron_id
            }}/profit_and_loss
          http_method: GET
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                max_retries: 10
                backoff_strategies:
                  - type: ExponentialBackoffStrategy
                    factor: 2
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
        partition_router:
          type: SubstreamPartitionRouter
          parent_stream_configs:
            - type: ParentStreamConfig
              parent_key: end_user_id
              partition_field: end_user_id_or_heron_id
              stream:
                $ref: '#/definitions/streams/end_users'
      transformations:
        - type: AddFields
          fields:
            - path:
                - end_user_id
              value: '{{ stream_partition.end_user_id_or_heron_id }}'
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: '#/schemas/profit_and_loss'
    bank_statement_summary_by_month:
      type: DeclarativeStream
      name: bank_statement_summary_by_month
      primary_key:
        - end_user_id
      retriever:
        type: SimpleRetriever
        requester:
          $ref: '#/definitions/base_requester'
          path: >-
            api/end_users/{{ stream_partition.end_user_id_or_heron_id
            }}/bank_statement_summary
          http_method: GET
          request_parameters:
            grouping: by_month
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                max_retries: 10
                backoff_strategies:
                  - type: ExponentialBackoffStrategy
                    factor: 2
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
        partition_router:
          type: SubstreamPartitionRouter
          parent_stream_configs:
            - type: ParentStreamConfig
              parent_key: end_user_id
              partition_field: end_user_id_or_heron_id
              stream:
                $ref: '#/definitions/streams/end_users'
      transformations:
        - type: AddFields
          fields:
            - path:
                - end_user_id
              value: '{{ stream_partition.end_user_id_or_heron_id }}'
        - type: AddFields
          fields:
            - path:
                - grouping
              value: '{{ record[''grouping''] }}'
        - type: AddFields
          fields:
            - path:
                - months
              value: '{{ record[''by_month''] }}'
        - type: RemoveFields
          field_pointers:
            - - average
        - type: RemoveFields
          field_pointers:
            - - total
        - type: RemoveFields
          field_pointers:
            - - by_month
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: '#/schemas/bank_statement_summary_by_month'
    scorecard_metrics:
      type: DeclarativeStream
      name: scorecard_metrics
      retriever:
        type: SimpleRetriever
        requester:
          $ref: '#/definitions/base_requester'
          path: >-
            api/end_users/{{ stream_partition.end_user_id_or_heron_id
            }}/scorecard
          http_method: GET
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                max_retries: 10
                backoff_strategies:
                  - type: ExponentialBackoffStrategy
                    factor: 2
              - type: DefaultErrorHandler
                response_filters:
                  - type: HttpResponseFilter
                    action: IGNORE
                    predicate: '{{ response.code == 400 }}'
                    http_codes:
                      - 400
                    error_message: End user hasn't been successfully enriched yet, skip
                    error_message_contains: End user hasn't been successfully enriched yet
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path:
              - metrics
        partition_router:
          type: SubstreamPartitionRouter
          parent_stream_configs:
            - type: ParentStreamConfig
              parent_key: end_user_id
              partition_field: end_user_id_or_heron_id
              stream:
                $ref: '#/definitions/streams/end_users'
      transformations:
        - type: AddFields
          fields:
            - path:
                - end_user_id
              value: '{{ stream_partition.end_user_id_or_heron_id }}'
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: '#/schemas/scorecard_metrics'
    bank_statement_summary_by_data_source:
      type: DeclarativeStream
      name: bank_statement_summary_by_data_source
      primary_key:
        - end_user_id
      retriever:
        type: SimpleRetriever
        requester:
          $ref: '#/definitions/base_requester'
          path: >-
            api/end_users/{{ stream_partition.end_user_id_or_heron_id
            }}/bank_statement_summary
          http_method: GET
          request_parameters:
            grouping: by_data_source_account_heron_id
          error_handler:
            type: CompositeErrorHandler
            error_handlers:
              - type: DefaultErrorHandler
                max_retries: 10
                backoff_strategies:
                  - type: ExponentialBackoffStrategy
                    factor: 2
        record_selector:
          type: RecordSelector
          extractor:
            type: DpathExtractor
            field_path: []
        partition_router:
          type: SubstreamPartitionRouter
          parent_stream_configs:
            - type: ParentStreamConfig
              parent_key: end_user_id
              partition_field: end_user_id_or_heron_id
              stream:
                $ref: '#/definitions/streams/end_users'
      transformations:
        - type: AddFields
          fields:
            - path:
                - end_user_id
              value: '{{ stream_partition.end_user_id_or_heron_id }}'
        - type: AddFields
          fields:
            - path:
                - grouping
              value: '{{ record[''grouping''] }}'
        - type: AddFields
          fields:
            - path:
                - data_sources
              value: '{{ record[''by_data_source_account_heron_id''] }}'
        - type: RemoveFields
          field_pointers:
            - - average
        - type: RemoveFields
          field_pointers:
            - - total
        - type: RemoveFields
          field_pointers:
            - - by_data_source_account_heron_id
      schema_loader:
        type: InlineSchemaLoader
        schema:
          $ref: '#/schemas/bank_statement_summary_by_data_source'
  base_requester:
    type: HttpRequester
    url_base: https://app.herondata.io
    authenticator:
      type: BasicHttpAuthenticator
      password: '{{ config["password"] }}'
      username: '{{ config["username"] }}'

streams:
  - $ref: '#/definitions/streams/end_users'
  - $ref: '#/definitions/streams/profit_and_loss'
  - $ref: '#/definitions/streams/bank_statement_summary_by_month'
  - $ref: '#/definitions/streams/scorecard_metrics'
  - $ref: '#/definitions/streams/bank_statement_summary_by_data_source'

spec:
  type: Spec
  connection_specification:
    type: object
    $schema: http://json-schema.org/draft-07/schema#
    required:
      - username
    properties:
      username:
        type: string
        order: 0
        title: Username
      password:
        type: string
        order: 1
        title: Password
        always_show: true
        airbyte_secret: true
    additionalProperties: true

metadata:
  autoImportSchema:
    end_users: false
    profit_and_loss: false
    bank_statement_summary_by_month: false
    scorecard_metrics: true
    bank_statement_summary_by_data_source: false
killthekitten commented 2 days ago

I wonder whether re-releasing the connector after the upgrade could help? Somehow it didn't work for me from the first try either

natikgadzhi commented 2 days ago

@killthekitten weird as it is, if you have an OSS / Cloud connector published to your workspace, try republishing, and tell me if that worked.

Nazaniiin commented 1 day ago

I wonder whether re-releasing the connector after the upgrade could help? Somehow it didn't work for me from the first try either

Re-releasing the connector did the trick! Thank you for the tip!