elastic / elasticsearch

Free and Open, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
68.6k stars 24.37k forks source link

[Ingest Pipeline] Ability to split documents #56769

Open m-adams opened 4 years ago

m-adams commented 4 years ago

It is common for tools to output data in a combined format where one document may contain several entities. For example, a tool that scans several hosts for compliance or vulnerabilities or an API that provides an update to every train/bus etc. We really want to split all entities out to separate docs while copying some high-level information. This is possible using Logstash and the Split filter but not possible with Ingest Pipelines.

The feature would allow this kind of document to be processed and split without having to include Logstash in the ingest chain.

MichelKoba commented 5 months ago

+1

behnam-ramouzeh commented 5 months ago

+1

birkanatici commented 5 months ago

➕1️⃣

dstor-team commented 4 months ago

+1

clement-fouque commented 4 months ago

This would be extremely helpfull and would allow us to solve this issue: https://github.com/elastic/integrations/issues/9261.

jmbass commented 3 months ago

+1

tylerperk commented 3 months ago

Indeed this is a popular request and we recognize there are valid use cases for this. However I think we are unlikely to implement this anytime soon given the complexity involved and balancing investment against our other priorities. We have discussed it on occasion within the team and it is viewed as extremely complex to implement and fraught with edge cases that we would want to cover if it were to be a robust and reliable solution.

Some of the concerns raised are about how to provide effective failure handling behavior and response details:

As well as bookkeeping for bulk requests:

Workaround options include using Logstash or other external tools to do the splitting. Doing it outside of ES helps with many of the concerns we have about doing it in ES. As we move more towards embracing OTel, in the future you might be able to use an OTel collector to do the split or it might be possible in other parts of Elastic offerings (no promises, just thinking out loud there).

In summary, we have no plans to implement this but please feel free to add more use case examples which would be valuable input to requirements should we choose to do this later.

carlopuri commented 3 days ago

Hello,

we have 2 use cases that can be solved only with this ingest pipeline enhancement.

Use case 1 - "inference" processor with "completion" task to prompt OpenAI and then manage GPT answer Whit this implementation, during document ingestion, we prompt OpenAI asking to analyse the text and give us back the result in a JSON document, with some fields as array. To use the data in Kibana we need to flatten all the received arrays.

Below an example of a document to index:

{
     "_index": "test-inference-ai-001",
     "_id": "test_ai_000001",
     "text_to_analyze": "Can't use the app or website to top up with my valid credit and debit cards. On the app it says declied by bank but on the website, it pulls the funds from my acct, then reverses the transaction 30 minutes later. A total waste of time. Contacted the bank and they confirmed they aren't declining it and the customer service chat was useless."
}

and the retuned JSON from OpenAI through inference processor with completion task:

{
  "topics": ["app", "website", "customer service"],
  "sentiments": ["negative", "negative", "negative"],
  "segments": ["Can't use the app or website to top up with my valid credit and debit cards.", "On the app it says declined by bank but on the website, it pulls the funds from my acct, then reverses the transaction 30 minutes later.", "the customer service chat was useless."],
  "reasons": ["can't top up with valid cards", "pulls funds then reverses transaction", "useless"]
}

we need to flatten the arrays in a new index like below:

{
    "index": "test-inference-ai-001_ingest",
    "_id" : "test_ai_000001_1",
    "topics": "app",
    "sentiments": "negative",
    "segments": "Can't use the app or website to top up with my valid credit and debit cards."
},
{
    "index": "test-inference-ai-001_ingest",
    "_id" : "test_ai_000001_2",
    "topics": "website",
    "sentiments": "negative",
    "segments": "On the app it says declined by bank but on the website, it pulls the funds from my acct, then reverses the transaction 30 minutes later."
},
{
    "index": "test-inference-ai-001_ingest",
    "_id" : "test_ai_000001_3",
    "topics": "customer service",
    "sentiments": "negative",
    "segments": "the customer service chat was useless."
}

Use case 2 - flattening array with "main category" and "sub category" labelling Whit this implementation, during document ingestion, we get documents with "main category" and "sub category" arrays. Both can have multiple values and to use the data in Kibana we need to flatten all these two arrays. We can't use logstash in our environment.

Below an example of a document to index:

   {
        "_index": "test-multilabel",
        "_id": "record_001",
        "_score": 1,
        "_source": {
          "doc_created_time": "01/01/2024",
          "top_category_classification": [
            "communications",
            "operator"
          ],
          "subcategory_classification": [
            "communications_general",
            "communications_clear",
            "operator_firendly",
            "operator_fast"
          ],
          "score": 4,
          "segment": "detractors",
          "system_virtual_id": "100200300400500"
    }

we need to flatten the arrays in a new index like below:

{
        "_index": "test-multilabel_splitted",
        "_id": "record_001_1",
        "_score": 1,
        "_source": {
          "doc_created_time": "01/01/2024",
          "top_category_classification": "communications",
          "subcategory_classification": "general",
          "score": 4,
          "segment": "detractors",
          "system_virtual_id": "100200300400500"
},
{
        "_index": "test-multilabel_splitted",
        "_id": "record_001_2",
        "_score": 1,
        "_source": {
          "doc_created_time": "01/01/2024",
          "top_category_classification": "communications",
          "subcategory_classification": "clear",
          "score": 4,
          "segment": "detractors",
          "system_virtual_id": "100200300400500"
},
{
        "_index": "test-multilabel_splitted",
        "_id": "record_001_3",
        "_score": 1,
        "_source": {
          "doc_created_time": "01/01/2024",
          "top_category_classification": "operator",
          "subcategory_classification": "friendly",
          "score": 4,
          "segment": "detractors",
          "system_virtual_id": "100200300400500"
},
{
        "_index": "test-multilabel_splitted",
        "_id": "record_001_4",
        "_score": 1,
        "_source": {
          "doc_created_time": "01/01/2024",
          "top_category_classification": "operator",
          "subcategory_classification": "fast",
          "score": 4,
          "segment": "detractors",
          "system_virtual_id": "100200300400500"
}

Using script processor we are able to properly create the documents as a new array (called "items") and we tried to use "foreach" processor on items of that array, but we aren't able to ingest them as separated documents. Something like:

"foreach": {
        "field": "items",
        "processor": {
          "script": {
            "source": """
              if (ctx.items instanceof List) {
                for (int i = 0; i < ctx.items.size(); i++) {
                  def item = ctx.items.get(i);
                  def new_doc = new HashMap(ctx);
                  new_doc.remove('items');
                  new_doc.put('array_item', item);
                  ctx['_ingest']['_index'] = ctx['_index'] + '_splitted';
                  ctx['_ingest']['_id'] = ctx['_id'] + '_' + i;
                  ctx['_ingest']['_op_type'] = 'create';
                  ctx['_ingest']['_bulk'].add(new_doc);
                }
                ctx.op = 'delete';
              }
            """
          }
        }
      }

Both of use cases have hight impact in projects with our customers.

In the first use case we are able to do an impressive work with "inference" processor and "completion" task prompting OpenAI but we can't manage and process received data as we need to show results in Kibana.

We hope the feature in this thread will be released soon...