astronomer / astro-sdk

Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
https://astro-sdk-python.rtfd.io/
Apache License 2.0
345 stars 43 forks source link

Load files parallelly #773

Open utkarsharma2 opened 2 years ago

utkarsharma2 commented 2 years ago

Please describe the feature you'd like to see Currently, we load files to the table sequentially. We can alternatively use get_file_list to load_file parallelly.

 LoadFile.partial(
        task_id="load_gcs_to_bq",
        output_table=Table(
            metadata=Metadata(
                schema=ASTRO_BIGQUERY_DATASET,
            ),
            conn_id=ASTRO_GCP_CONN_ID,
        ),
        use_native_support=True,
    ).expand(input_file=get_file_list(path=GCS_BUCKET, conn_id=ASTRO_GCP_CONN_ID))

But in the above code, get_file_list can result in 100s of files, which will result in 100s of tasks.

Describe the solution you'd like for example, GCS_BUCKET has the following files:

s3://tmp/test1.csv
s3://tmp/test2.csv
s3://tmp/test3.csv
s3://tmp/test4.csv
s3://tmp/test5.csv
s3://tmp/test6.csv

Currently get_file_list will produce:

[
  s3://tmp/test1.csv,
  s3://tmp/test2.csv,
  s3://tmp/test3.csv,
  s3://tmp/test4.csv,
  s3://tmp/test5.csv,
  s3://tmp/test6.csv
]

We can have a partial approach where get_file_list can produce buckets for files like where we can specify the max bucket size by get_file_list(path=GCS_BUCKET, conn_id=ASTRO_GCP_CONN_ID, max_bucket_size=3):

[
   [s3://tmp/test1.csv, s3://tmp/test2.csv, s3://tmp/test3.csv]
   [s3://tmp/test4.csv, s3://tmp/test5.csv, s3://tmp/test6.csv]
]

Then, load_file should be able to process the list of files so that we can load file parallelly and simultaneously control the number of tasks generated. Since there are only two items generated it will result in two tasks and not six tasks.

 LoadFile.partial(
        task_id="load_gcs_to_bq",
        output_table=Table(
            metadata=Metadata(
                schema=ASTRO_BIGQUERY_DATASET,
            ),
            conn_id=ASTRO_GCP_CONN_ID,
        ),
        use_native_support=True,
    ).expand(input_file=get_file_list(path=GCS_BUCKET, conn_id=ASTRO_GCP_CONN_ID, max_bucket_size=3))

Are there any alternatives to this feature? Open to suggestions

Acceptance Criteria

phanikumv commented 2 years ago

Consider letting the user to specify this max_bucket_size attribute