airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.83k stars 4.06k forks source link

Source Pipedrive: incremental sync misses data #27502

Open nicklubbers opened 1 year ago

nicklubbers commented 1 year ago

Connector Name

source-pipedrive

Connector Version

0.1.18

What step the error happened?

During the sync

Revelant information

Airbyte version: 0.44.12 Pipedrive connector version: 0.1.18

We are missing data from Pipedrive. Specifically organizations and persons, but it will likely also affect other streams. Below you see step by step description showing that there are more than 500 organizations in Pipedrive which were updated after 2023-06-20 03:14:34, but only 300 of them are synced by Airbyte. We have a strong feeling that it has something to do with pagination and/or the cursor field.

  1. Stream state in Airbyte:

    {
    "streamDescriptor": {
      "name": "organizations"
    },
    "streamState": {
      "update_time": "2023-06-20 03:14:34"
    }
    }
  2. Call via Postman with since_timestamp=2023-06-20 03:14:34:

    https://api.pipedrive.com/v1/recents?since_timestamp=2023-06-20 03:14:34&items=organization&start=0&limit=500
  3. Response includes 500 organizations and the following metadata:

"additional_data": {
    "since_timestamp": "2023-06-20 03:14:34",
    "last_timestamp_on_page": "2023-06-20 09:09:29",
    "pagination": {
        "start": 0,
        "limit": 500,
        "more_items_in_collection": true,
        "next_start": 500
    }
}

Thus there are more than 500 organizations. And indeed when calling with start=500&limit=500 we get more organizations.

  1. Now, when we start the Airbyte Pipedrive connection sync for organization with the Stream state as shown above, we get:
    
    2023-06-20 10:07:05 destination > Starting a new buffer for stream pipedrive__organizations (current state: 848 KB in 5 buffers)
    2023-06-20 10:07:05 destination > Default schema.
    2023-06-20 10:07:06 source > Read 300 records from organizations stream
    2023-06-20 10:07:06 source > Marking stream organizations as STOPPED
    2023-06-20 10:07:06 source > Finished syncing organizations

...

{ "streamName" : "pipedrive__organizations", "stats" : { "bytesCommitted" : 1576429, "bytesEmitted" : 1576429, "recordsEmitted" : 300, "recordsCommitted" : 300 } }


Thus less then the 500+ we found when calling the endpoint ourselves. Also suspicious that it is exactly 300...
Furthermore, looking at some other streams the numbers are also suspicious:

{ "streamName" : "pipedrive__deals", "stats" : { "bytesCommitted" : 88918, "bytesEmitted" : 88918, "recordsEmitted" : 50, "recordsCommitted" : 50 } }

{ "streamName" : "pipedrive__persons", "stats" : { "bytesCommitted" : 234142, "bytesEmitted" : 234142, "recordsEmitted" : 100, "recordsCommitted" : 100 } }



PS. Remark that if you call the recents endpoint with `limit=x` where `x>500`, Pipedrive will ignore that value and just use `limit=500`.

### Relevant log output

_No response_

### Contribute

- [ ] Yes, I want to contribute
nicklubbers commented 1 year ago

Downgrading to 0.1.13 seemed to fix the problem...

  {
    "streamName" : "pipedrive__organizations",
    "stats" : {
      "bytesCommitted" : 15786735,
      "bytesEmitted" : 15786735,
      "recordsEmitted" : 3005,
      "recordsCommitted" : 3005
    }
  }, {
    "streamName" : "pipedrive__persons",
    "stats" : {
      "bytesCommitted" : 2246216,
      "bytesEmitted" : 2246216,
      "recordsEmitted" : 960,
      "recordsCommitted" : 960
    }
  },