airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.89k stars 3.82k forks source link

Source Monday.com: Incremental Sync #20950

Closed tmvy closed 1 year ago

tmvy commented 1 year ago

Tell us about the problem you're trying to solve

The Monday source (repo, docs) currently implements Full Refresh Sync mode only: Airbyte users must trigger the retrieval of all records in a Monday account upon every sync.

The addition of an Incremental Sync mode would provide Airbyte users with a more efficient and more satisfactory solution for Monday accounts with a large number of items and/or a requirement for a frequent sync interval, for which full refresh may be unacceptable.

This may also help resolve #15569 and related issues.

Implementation of an incremental stream for the Monday GraphQL API initially appears to be not possible, or at least not to be as straightforward as it might be with some other APIs, given the absence of an updated_after or other datetime argument for most queries: users of the API are seemingly expected to request all data without use of a time-based cursor. However, there is an available solution, described below.

Describe the solution you’ve considered

Of the objects/streams available, board items (previously named pulse in Monday) is perhaps the one most likely to grow in scale to a large number of records, and so incremental sync of items in particular would be most useful (whereas the boards, users and teams are less likely to greatly increase as usage increases).

The Activity Logs query can return items updated after given time

An indirect strategy for retrieving items incrementally is available via the Activity Logs query:

So, upon each incremental sync, an Airbyte source could first query activity logs using the timestamp from the last sync, then use the list of item IDs in the ids argument of the Items query with a substream slicer. Incremental sync of items would then be possible, while full sync for the other streams will likely remain acceptable as currently implemented.

Are you willing to submit a PR?

Without significant support and a deep understanding of Airbyte development, I would not feel confident in correctly implementing the logic described above.

I've attempted to modify the existing low-code and Python CDK versions, but have not been able to find enough examples and docs to make clear which approach I should use and where Airbyte's abstractions accommodate my goal versus where they should be overridden.

I was hoping to implement this myself within a week, but I currently feel more confident that I would be able to achieve that by building an ETL script with Python from scratch, than I do about being able to understanding enough about which parts of the CDK I should work with. I feel that after investing significant time into getting to know the CDK, I could still find myself a long way away from being able to implement what I've described above. So, opening this request and making a start on my own solution in the meantime seems the favourable approach, but I'd be open to reconsidering!

While Airbyte is actively evolving and adapting, the recent and ongoing state of transition from CDK to config/YAML (#19586, #20533, which I recognise is necessary for continual improvement) makes it difficult for me as an outsider to know which framework would be suitable at this point in time. Perhaps a developer who is involved and experienced in this work would be better placed to investigate the strategy proposed above. I would be happy to help with testing or less technical contributions if useful. Many thanks for your help!

tmvy commented 1 year ago

Also, here's a proof-of-concept example of how to implement the strategy above, in Python.

Note that 2 paginators are required: without paginating through both boards and activity logs in turn, Monday will only return activity logs of the first 25 (the default limit) boards. This is not obvious at first. 2 paginators are also required when requesting items via boards, as the connector currently appears to do.

import json
import requests

def flatten_list(input_list):
    return [item for sublist in input_list for item in sublist]

# Prepare GraphQL query to list activity logs for all boards
query = """
query activity_logs($from: ISO8601DateTime, $board_page: Int, $log_page: Int) {
  boards(page: $board_page, limit: 100) {
    activity_logs(from: $from, page: $log_page, limit: 100) {
      id
      data
      entity
    }
  }
}
"""
variables = {"from": "2023-01-01T00:00:00", "board_page": 1, "log_page": 1}
response = requests.get(
    url="https://api.monday.com/v2",
    json={"query": query, "variables": variables},
    headers={"Authorization": "Bearer ***"},
)

# Prepare response data
response_data = response.json()
activity_logs = flatten_list(
    [board["activity_logs"] for board in response_data["data"]["boards"]]
)

if activity_logs:  # if list is not empty

    # Filter for board and pulse (AKA item) objects
    boards = [
        json.loads(al["data"])
        for al in activity_logs
        if al["entity"] == "board"
    ]
    pulses = [
        json.loads(al["data"])
        for al in activity_logs
        if al["entity"] == "pulse"
    ]

    # Extract and deduplicate IDs for boards and items
    board_ids: set[int] = set([b["board_id"] for b in boards])    
    # Note there seem to be two possible schemas for the pulse object, but both contain the item ID
    pulse_ids: set[int] = set(
        [
            *[p["pulse_id"] for p in pulses if "pulse_id" in p],
            *[p["pulse"]["id"] for p in pulses if "pulse" in p],
        ]
    )

    # Further requests can now be made using those IDs...