kestra-io / plugin-elasticsearch

https://kestra.io/plugins/plugin-elasticsearch/
Apache License 2.0
4 stars 4 forks source link

Add specific `elasticsearch.ESQL` task to query elastic with ESQL syntax #16

Open paulgrainger85 opened 1 day ago

paulgrainger85 commented 1 day ago

Feature description

Note if not already in place, we will need to upgrade the library to 8.15 to support this.

Add a new task to the Elasticsearch plugin which provides the ES|QL query language as an option https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html

This is a relatively new feature which provides a simple and powerful syntax to users of Elastic to filter, transform and analyse data.

As output we should provide users the option to return different types, both text and binary. e.g. see https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-rest.html

anna-geller commented 1 day ago

I have a feeling this should already work with the Request task?

POST /_query?format=txt
{
  "query": """
FROM sample_data
  """
}

https://kestra.io/plugins/plugin-elasticsearch/tasks/io.kestra.plugin.elasticsearch.request

maybe it's just a matter of adding a plugin example + blueprint? Would be worth checking with this task first -- @Ben8t can you give it a try and evaluate if @mgabelle would need to add a new task or if this one is sufficient?

anna-geller commented 1 day ago

Extra note from @paulgrainger85 that even if the Request task works, we may need to update the underlying ES plugin libraries to 8.15 to support this as it's a new feature on Elastic

EDIT: According to Ludo, this will need to separate plugin-elasticsearch and plugin-opensearch. Existing code in this plugin-elasticsearch repo currently uses OpenSearch libraries so we'll need to copy or move it to plugin-opensearch before adding ES-specific functionality

anna-geller commented 1 day ago

edit: added in 8.11

anna-geller commented 1 day ago

We have a decision: @loicmathieu will add a dedicated https://github.com/kestra-io/plugin-opensearch repository/plugin and the plugin-elasticsearch will switch to use elasticsearch client so that it will be possible to use new features like this one 🚀

and Matt can then add the new task if needed

syepes commented 1 day ago

Effectively this is a feature that can only be found in the Elasticsearch product and not on the other forks. ES|QL is very interesting because it would permit us to rapidly retrieve only the necessary fields and convert them to DataFrame without much effort and its also more performant in some use cases.

Small example:

def es_query_esql(es, query, filter=None) -> DataFrame:
  """
  Query Elasticsearch using SQL and convert the result into a DataFrame
  """

  try:
    # https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-query-api.html
    client = Elasticsearch(es["host"], api_key=es["api_key"])
    resp = client.esql.query(query=query, filter=filter, format="csv")
    df = pd.read_csv(StringIO(resp.body), sep=',')
    logger.info(f"Records: {len(df)}")
    return pd.read_csv(StringIO(resp.body), sep=',')
  except Exception as e:
    logger.error(f"Query: {e}")
    return []

cfg = {
    'host': "{{ secret('ES_URL') }}",
    'api_key': "{{ secret('ES_TOKEN') }}",
}
filters = {'range': {'@timestamp': {'gte': 'now-1h'}}}
query = """
from logs-XYZ
| KEEP fieldA, fieldb
| LIMIT 3
"""
df = es_query_esql(cfg, query, filters)
Ben8t commented 1 day ago

Just tested out, this work well with current plugins:

id: elastic_query_python
namespace: company.team

tasks:
  - id: esql_query
    type: io.kestra.plugin.elasticsearch.Request
    connection:
      hosts:
        - "https://b08318eb65f34bfdadcca4a633ea1602.us-central1.gcp.cloud.es.io:443"
      headers: 
        - "Authorization: ApiKey {{ secret('ES_API_TOKEN') }}"
    method: "POST"
    endpoint: /_query?format=csv
    body:
      query: |
        FROM test
        | WHERE title IS NOT NULL

  - id: pandas
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      data.csv: "{{ outputs.esql_query.response }}"
    beforeCommands:
      - pip install pandas
    script: |
      import pandas as pd
      data = pd.read_csv("data.csv", sep=',')
      print(data.head())

Keeping the issue open, as we might create a proper Query task to support ESQL directly (going through the client rather than REST API) and thus streamline the writing cc @paulgrainger85

anna-geller commented 1 day ago

makes sense to keep this open for now to validate if that prospect needs more than the current solution, if not we can close it after the PoC 👍

loicmathieu commented 1 day ago

The Request allow to send low level request to Elasticsearch but we should provide something more high level so a Query task would be better.

I propose that @mgabelle add a Query task as soon as the client is switched to Elasticsearch which is a work in progress in #17

anna-geller commented 1 day ago

@Ben8t I guess you will have to design the plugin syntax then in the end 😅

some direction for you to explore - propose syntax you see best fitting to this API (would likely output ION file if STORE?):

  - id: esql_query
    type: io.kestra.plugin.elasticsearch.ESQL
    url: "https://b08318eb65f34bfdadcca4a633ea1602.us-central1.gcp.cloud.es.io:443"
    apiKey: "{{ secret('ES_API_KEY') }}"
    fetchType: STORE
    query: |
        FROM test
        | WHERE title IS NOT NULL
Ben8t commented 23 hours ago

https://github.com/kestra-io/plugin-elasticsearch/pull/17 is merged !

@mgabelle let me know if anything isn't clear enough. As you are working in parallel with query task I believe this one should be quite similar :)

cc @anna-geller

Task Definition

type: io.kestra.plugin.elasticsearch.ESQL

🔗 General ESQL API Reference

Configuration Properties

Required Properties

Property Type Description
url String The Elasticsearch instance URL
query String The ESQL query to execute
apiKey String - API key for authentication. Can reference secrets

Optional Properties

Property Type Default Description
fetchType Enum FETCH Output handling mode: STORE, FETCH, or FETCH_ONE
format Enum json Output format: csv, json, txt, or tsv, and every format referenced here
headers Map<String, String> - Additional HTTP headers
requestTimeout Duration 30s Request timeout
connectTimeout Duration 30s Connection timeout

FetchType Behavior

STORE

FETCH

FETCH_ONE

Outputs

Common Outputs

Output Type Description
outputs.query String The executed ESQL query
outputs.duration Duration Query execution time
outputs.rowCount Integer Number of rows/documents returned (probably "len(values)" from the API response)

Example Usage

id: esql_query
type: io.kestra.plugin.elasticsearch.ESQL
url: "https://elasticsearch.example.com:443"
apiKey: "{{ secret('ES_API_KEY') }}"
fetchType: FETCH
format: csv
query: |
    FROM test
    | WHERE title IS NOT NULL
id: esql_query
type: io.kestra.plugin.elasticsearch.ESQL
url: "https://elasticsearch.example.com:443"
apiKey: "{{ secret('ES_API_KEY') }}"
fetchType: STORE
query: |
    FROM test
    | WHERE title IS NOT NULL

Possible Follow Up