airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.12k stars 4.12k forks source link

PoC concurrent Hubspot #31199

Closed girarda closed 11 months ago

girarda commented 1 year ago

What

Implement a PoC of source-hubspot running with the concurrent cdk.

The goal is to confirm that

  1. We can see performance improvements on the hubspot connector by using the concurrent cdk
  2. the abstractions introduced through the AbstractStream are expressive enough for the hubspot connector.

This can be done through 2 PoCs:

  1. Use StreamFacade.create_from_stream to wrap the hubspot streams and use the concurrent CDK with a concurrency level > 1
  2. Re-implement a hubspot stream using the AbstractStream interface, implementing the necessary Partition and PartitionGenerator objects
    • OK to disable state_checkpoint_interval as that is not supported in the ccdk

The first will confirm we can easily leverage the concurrent cdk. The second will confirm source-hubspot can use the new abstractions without the need for the adapters.

Since the goal is to validate that direction of the concurrent cdk, there is no need to merge in the code.

Acceptance Criteria

Either:

girarda commented 1 year ago

grooming notes:

girarda commented 1 year ago

Additional acceptance criteria that came up during grooming today:

maxi297 commented 12 months ago

Benchmark on full refresh

max_workers sync duration
witness 43s
2 29s
10 24s
12 <generated 429 errors>
15 <generated 429 errors>
20 <generated 429 errors>

Note that I observed 429 when running incremental on 5 max_workers. All this was done on a Free product tier which allows for 100 requests/10 seconds. Hubspot also has other product tiers that allow for 150 requests/10 seconds and an API add-on for 200 requests/10 seconds.

As to why we don't see more of an improvement, it's probably because all "search" streams are not generating slices. Out of 17 incremental syncs, 11 are "search" streams. Could they be sliced? With my limited understanding, it seems it could if we don't mind emitting duplicate as we slice the requests in _read_stream_records. However, I don't know how state management could be done while partitioning like that (state management is pretty unconventional anyway (see concern below)). There is also the question about duplicated records as records are accumulated in a post_processor that seems to group and flatten them and based on the code, it can occur across multiple requests.

Concerns

Caches

All cache usage relies on properties that are set once at the class level and are not modified

All these should be fine because current implementation is immutable in the scope of a source read.

CATs empty tests

bypass_reason: Unable to populate

Incremental

Usage of stream_state in Engagements.read_records

This logic could be moved to stream_slices instead which would allow us to avoid relying on stream state during processing the partition. This would probably not be an issue as there is only one stream slice so nothing to parallelize but the new AbstractStream interface should not rely on the state once the partitions are created.

Client side filtering/semi-incremental

Classes inheriting from ClientSideIncrementalStream:

Possible solution: add _lowest_cursor_value in streams that will be populated by state value when starting the sync

State datetime format multiplicity

Using companies as an example, I can see three different state value format:

We can see other streams with other formats like This connection owners have format %Y-%m-%dT%H:%M:%S.%f%z while our Expected records have %Y-%m-%dT%H:%M:%S.%fZ without microseconds. Based on the sample file, it can even be an empty string

subscription_changes had to be excluded from the tests because it's cursor value is and ISO8601 date (at least in sample_files/sample_state.json) but it generate slices as an epoch.

State management is weird (or different at least)

At the end of an IncrementalStream, the state will be set to the timestamp at the beginning of the stream. The comments in the code mentions:

The code does this for all the incremental streams, not just the search ones.

maxi297 commented 12 months ago

(very ugly) code for incremental syncs: https://github.com/airbytehq/airbyte/compare/maxi297/ccdk-hubspot-poc. Only discrepancy is the state messages. This is because the ConcurrentCursor does not update the state based on _init_sync

Other notes:

girarda commented 12 months ago

This is great @maxi297 !

my general feeling from reading this is we could introduce concurrency to the hubspot connector but investing in connector-specific improvements would help alleviate the concerns and increase the benefits the connector would gain.

It probably would make more sense for a hubspot-connector-expert to drive the addition of concurrency instead of extensibility

clnoll commented 12 months ago

As to why we don't see more of an improvement, it's probably because all "search" streams are not generating slices. Out of 17 incremental syncs, 11 are "search" streams. Could they be sliced? With my limited understanding, it seems it could if we don't mind emitting duplicate as we slice the requests in _read_stream_records.

Could you elaborate on this? I think I'm missing some background about search streams. Why aren't they sliced and how would emitting duplicates solve the problem?

Note that I observed 429 when running incremental on 5 max_workers. All this was done on a Free product tier which allows for 100 requests/10 seconds. Hubspot also has other product tiers that allow for 150 requests/10 seconds and an API add-on for 200 requests/10 seconds.

I don't suppose there's a way to get the rate limit programmatically so we can set the number of workers based on that?

maxi297 commented 11 months ago

Could you elaborate on this? I think I'm missing some background about search streams. Why aren't they sliced and how would emitting duplicates solve the problem?

I'm also missing some background to honest. So here's the one I have with questions that I think could describe what information I'm missing. So, to the best of my understanding:

Each CRM endpoints have two ways of getting retrieve based on if there is a state (uses search to filter on lastmodifieddate or hs_lastmodifieddate) or if there isn't (see code here). Both code paths possibly generates multiple requests for different reason: either because they slice on the cursor field or because they need to explicit request for additional fields which might break some urllib limitation. These properties are retrieved using /crm/v3/schemas which might be custom per user.

Questions:

I don't suppose there's a way to get the rate limit programmatically so we can set the number of workers based on that?

Sadly, I couldn't find a response in the API doc that would return this information, no.

sowla commented 3 months ago

Hi everyone, we're having problems with Hubspot streams seemingly not resuming to previous checkpoints and wanted to ask if checkpointing works with the Hubspot source..? I ask because of this line: OK to disable state_checkpoint_interval as that is not supported in the ccdk

Thank you in advance!