airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.73k stars 3.78k forks source link

[source-posthog] Connector historical sync #33915

Open shmf opened 6 months ago

shmf commented 6 months ago

Connector Name

source-posthog

Connector Version

1.0.0

What step the error happened?

During sync

Relevant information

Hi,

we have a Posthog server running where there are 4 projects defined.

I set up a connector to start syncing the events and i let it run. After a couple of days, it failed. I tried again playing a bit with the "Start Date" and ended up noticing that even for a few hours of history the connector was taking literally ages. While checking the raw table i noticed that there were thousands of duplicates.

The log doesn't show anything - I even started checking the actual connector code but I really don't understand where these duplicates are coming from.

Any suggestion? Or any additional information you may need?

Thanks in advance, Marco

Relevant log output

No response

Contribute

shmf commented 6 months ago

Uploading lo log in case someone can see something i am missing... posthog_logs_16115_txt.txt

shmf commented 6 months ago

I managed to create my own Pagination strategy, which is nothing more than a clone of the original cursor pagination strategy originally used in the connector, and i found out that the token returned is always the same. image

shmf commented 5 months ago

If anybody is interested, i managed to find the problem. In the default requester, when preparing the request, there is no rule to manage the next page.

There is a comment that even says

E.g: you might want to define query parameters for paging if next_page_token is not None.

I have then created my own requester and replaced

options = self._get_request_options(
                stream_state, stream_slice, next_page_token, self.get_request_params, self.get_authenticator().get_request_params, extra_params
            )

with

if next_page_token is not None:

    url = next_page_token['next_page_token']
    parsed_url = urlparse(url)

    options=dict((k, v[0] if isinstance(v, list) else v)
        for k, v in parse_qs(parsed_url.query).items())

else:
    options = self._get_request_options(
        stream_state, stream_slice, next_page_token, self.get_request_params, self.get_authenticator().get_request_params, extra_params
    )

I then had to reference my requester in the manifest.yaml before compiling my version of PostHog.

I would be happy to submit a PR is someone from the team is able to validate my solution.

domzae commented 4 months ago

Hi @shmf, (I'm not from the Airbyte team, but) I was able to implement your solution in the same way to resolve the same problem. Thanks! 🙌

shmf commented 4 months ago

hey @domzae i am glad it worked for you and thank you for confirming the solution works. I guess i can raise a PR then and have this bug fixed :)

alexchouraki commented 3 months ago

Hi! Encountering this issue as well, thanks a lot for the workaround! Hope the Airbyte team validates your solution soon to be able to just upgrade the source :)

marcosmarxm commented 2 months ago

@MaxwellJK let me know if you need to submit the PR

MaxwellJK commented 2 months ago

@marcosmarxm working on it - i'll push as soon as possible and let you know

alexchouraki commented 4 weeks ago

Hi @MaxwellJK! Any update on this? That connector would be pretty useful for us, and is completely unusable as it is :(