kestra-io / plugin-jdbc

https://kestra.io/plugins/
Apache License 2.0
10 stars 7 forks source link

Add a new Queries task to run multiple SQL with multiple outputs, parameter binding and transactions #368

Open anna-geller opened 2 weeks ago

anna-geller commented 2 weeks ago

Context

The current Query task is limited to executing a single SQL statement and handling its output. Some automation tasks require executing multiple SQL statements, potentially wrapped in a transaction, while handling outputs of multiple SELECT statements.

To avoid breaking changes in the Query tasks, here is a proposal for a new Queries task.

This task will enable:

Use case for testing

Start Postgres in a container:

docker run --name queries -e POSTGRES_PASSWORD=mysecretpassword -e POSTGRES_USER=postgres -e POSTGRES_DB=postgres -p 5432:5432 -d postgres

Create tables (alternatively directly from the new Queries task):

CREATE TABLE myusers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    last_login TIMESTAMP
);

CREATE TABLE mylogs (
    log_id SERIAL PRIMARY KEY,
    user_email VARCHAR(255) NOT NULL,
    action VARCHAR(255) NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    FOREIGN KEY (user_email) REFERENCES myusers(email)
);

Implementation

Task Type: io.kestra.plugin.jdbc.postgresql.Queries (equivalent tasks will be needed for all JDBC plugin subgroups, not just Postgres)

Description: The Queries task allows executing multiple SQL statements within a single task, with support for parameter binding and transaction management. The task can handle multiple SELECT statements and their outputs, allowing you to fetch the results directly or store them as internal storage ION files.

Note on parsing SQL statements from a single string

Important note: we want to make the sql property work as a single string allowing to execute multiple SQL statements separated by semicolons (;). We want to support both:

  1. parameters as a map
  2. sql as a single string with multiple SQL statements separated by semicolons (;)

If supporting both is not feasible at the same time e.g. because of the performance/cost of parsing the SQL string, we can consider supporting only the sql as a single string without parameters. Supporting sql as a single string with multiple SQL statements separated by semicolons (;) has higher priority than supporting parameters.

We should investigate how other tools in the Java ecosystem parse queries for execution e.g. Flyway https://www.baeldung.com/liquibase-vs-flyway.

TL;DR sql as a string without parameters > sql as an array with parameters

Properties

Outputs

The outputs will be an array, with each element corresponding to the output of a single query in the sql string. Each output element may contain the following fields based on the fetchType:

Example flow

id: run_queries
namespace: company.team

tasks:
  - id: setup
    type: io.kestra.plugin.jdbc.postgresql.Queries
    url: jdbc:postgresql://host.docker.internal:5432/postgres
    username: postgres
    password: xxx
    sql: |
      DROP TABLE IF EXISTS myusers CASCADE;
      CREATE TABLE myusers (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        email VARCHAR(255) UNIQUE NOT NULL,
        last_login TIMESTAMP
      );
      DROP TABLE IF EXISTS mylogs;
      CREATE TABLE mylogs (
        log_id SERIAL PRIMARY KEY,
        user_email VARCHAR(255) NOT NULL,
        action VARCHAR(255) NOT NULL,
        timestamp TIMESTAMP NOT NULL,
        FOREIGN KEY (user_email) REFERENCES myusers(email)
      );

  - id: queries
    type: io.kestra.plugin.jdbc.postgresql.Queries
    url: jdbc:postgresql://host.docker.internal:5432/postgres
    username: postgres
    password: xxx
    sql: |
      INSERT INTO myusers (name, email) VALUES (:name, :email) ON CONFLICT (email) DO NOTHING;
      UPDATE myusers SET last_login = :login WHERE email = :email;
      INSERT INTO mylogs (user_email, action, timestamp) VALUES (:email, 'login', :login);
      SELECT * FROM myusers WHERE email = :email;
      SELECT * FROM mylogs WHERE user_email = :email;
    parameters:
      name: Rick
      email: "rick@kestra.io"
      login: "{{ execution.startDate }}"
    fetchType: FETCH

Example Output

For a query with fetchType: FETCH:

[
  {
    "rows": [
      {
        "id": 1,
        "name": "Rick",
        "email": "rick@kestra.io",
        "last_login": "2024-08-29T10:00:00Z"
      }
    ],
    "size": 1
  },
  {
    "rows": [
      {
        "log_id": 1,
        "user_email": "rick@kestra.io",
        "action": "login",
        "timestamp": "2024-08-29T10:00:00Z"
      }
    ],
    "size": 1
  }
]

For a query with fetchType: STORE:

[
  {
    "uri": "kestra:///internal/storage/queries-result-3.ion",
    "size": 1
  },
  {
    "uri": "kestra:///internal/storage/queries-result-4.ion",
    "size": 1
  }
]

For a query with fetchType: FETCH_ONE:

[
  {
    "row": {
      "id": 1,
      "name": "Rick",
      "email": "rick@kestra.io",
      "last_login": "2024-08-29T10:00:00Z"
    }
  },
  {
    "row": {
      "log_id": 1,
      "user_email": "rick@kestra.io",
      "action": "login",
      "timestamp": "2024-08-29T10:00:00Z"
    }
  }
]