kestra-io / plugin-serdes

https://kestra.io/plugins/plugin-serdes/
Apache License 2.0
2 stars 6 forks source link

Investigate whether there is a way to speed up CSV to ION conversion (CsvReader) #102

Closed anna-geller closed 2 months ago

anna-geller commented 6 months ago

Feature description

Converting CSV to ION (CsvReader) here takes 43 seconds for 150k rows (28.1 MB file). Ludo thinks this is unexpected and too slow (a 1GB file should take that long). I'm opening an issue just to investigate if there is some way to speed it up.

Reproducer:

id: file_processing
namespace: tutorial

variables:
  file_id: "{{ execution.startDate | dateAdd(-3, 'MONTHS') | date('yyyyMM') }}"

tasks:
  - id: get_zipfile
    type: io.kestra.plugin.fs.http.Download
    uri: "https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id) }}-divvy-tripdata.zip"

  - id: unzip
    type: io.kestra.plugin.compress.ArchiveDecompress
    algorithm: ZIP
    from: "{{ outputs.get_zipfile.uri }}"

  - id: csv_to_ion
    type: io.kestra.plugin.serdes.csv.CsvReader
    from: "{{outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv']}}"

  - id: to_parquet
    type: io.kestra.plugin.serdes.avro.AvroWriter # render(vars.file_id)
    from: "{{ outputs.csv_to_ion.uri }}"
    datetimeFormat: "yyyy-MM-dd' 'HH:mm:ss"
    schema: |
      {
        "type": "record",
        "name": "Ride",
        "namespace": "com.example.bikeshare",
        "fields": [
          {"name": "ride_id", "type": "string"},
          {"name": "rideable_type", "type": "string"},
          {"name": "started_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "ended_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
          {"name": "start_station_name", "type": "string"},
          {"name": "start_station_id", "type": "string"},
          {"name": "end_station_name", "type": "string"},
          {"name": "end_station_id", "type": "string"},
          {"name": "start_lat", "type": "double"},
          {"name": "start_lng", "type": "double"},
          {
            "name": "end_lat", 
            "type": ["null", "double"],
            "default": null
          },
          {
            "name": "end_lng",
            "type": ["null", "double"],
            "default": null
          },
          {"name": "member_casual", "type": "string"}
        ]
      }
loicmathieu commented 6 months ago

We have a new facility to handle file processing merged on 0.16 (external contribution not yet used), it is planned in this Sprint to try to use it on some tasks to see its benefits.

This example can be a good start, I'll move it to 0.17

anna-geller commented 5 months ago

related https://github.com/kestra-io/kestra/issues/3475