Currently PyAirbyte pulls all the new information for a given stream, much like Airbyte works by default. However, in order for PyAirbyte to be truly part of an orchestrated pipeline, it should abide by the same timings and scope as the timeline in other words, we should be able to dictate which data we want to pull.
Context
Consider the case of a Shopify source. Lets say we have an hourly ingestion pipeline. Under a proper orchestration, we will pull only the incremental hourly data, and we'll let our transformation pipelines know that they should only focus on processing the data created within that hour.
Currently, we are contingent on when the PyAirbyte script was ran to determine what gets consumed. If we trigger it at 13:05 it will consume records created until 13:05, ideally however, we should be able to state: "only fetch records created until 13:00" so that in the next run, we are able to increase that window by one hour, and let the transformation layer know to know process the additional hour's records.
Example
import airbyte as ab
source: ab.Source = ab.get_source(
name="source-shopify",
config={
"start_date": "2024-10-01T00:00:00Z",
"end_date": "2024-10-01T01:00:00Z",
}
)
Summary
Currently PyAirbyte pulls all the new information for a given stream, much like Airbyte works by default. However, in order for PyAirbyte to be truly part of an orchestrated pipeline, it should abide by the same timings and scope as the timeline in other words, we should be able to dictate which data we want to pull.
Context Consider the case of a Shopify source. Lets say we have an hourly ingestion pipeline. Under a proper orchestration, we will pull only the incremental hourly data, and we'll let our transformation pipelines know that they should only focus on processing the data created within that hour. Currently, we are contingent on when the PyAirbyte script was ran to determine what gets consumed. If we trigger it at 13:05 it will consume records created until 13:05, ideally however, we should be able to state: "only fetch records created until 13:00" so that in the next run, we are able to increase that window by one hour, and let the transformation layer know to know process the additional hour's records.
Example
Have I overlooked and this is actually possible?