kestra-io / plugin-jdbc

https://kestra.io/plugins/
Apache License 2.0
10 stars 7 forks source link

Find a way to not require {{ workingDir }} within a DuckDB query when input files are provided #267

Open anna-geller opened 5 months ago

anna-geller commented 5 months ago

Feature description

See an example:

id: api_python_sql
namespace: blueprint

tasks:
  - id: extract
    type: io.kestra.plugin.fs.http.Download
    uri: https://dummyjson.com/products

  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    docker:
      image: python:3.11-alpine # 57 MB
    inputFiles:
      data.json: "{{ outputs.extract.uri }}"
    outputFiles:
      - "*.json"
    script: |
      import json

      with open("data.json", 'r') as file:
        data = json.load(file)

      filtered_data = [{"brand": product["brand"], "price": product["price"]} for product in data["products"]]
      with open("products.json", 'w') as file:
        json.dump(filtered_data, file, indent=4)

  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_json_auto('{{ workingDir }}/products.json') # ❌ this is error-prone and not intuitive
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

Ideally, the same should work by using:

...
  - id: sqlQuery
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_json_auto('products.json')
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true
anna-geller commented 2 months ago

adding another example raised by @Ben8t -- this should work out of the box without having to use any extra tasks or {{workingDir}} expression:

id: train_model
namespace: kestra.weather.dev.ml

tasks:
  - id: get_data
    type: io.kestra.plugin.gcp.gcs.Downloads
    from: "gs://weather-kestra/data/"
    action: NONE
    listingType: RECURSIVE
    serviceAccount: "{{ secret('GCP_SA') }}"

  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles: "{{ outputs.get_data.outputFiles }}"
    sql: |
      SELECT * FROM read_csv_auto("*.csv")
    store: true