opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.82k stars 1.83k forks source link

[Feature Request] Ingestion pipelines using S3 compatible storage instead of base64 encoded data #16170

Open ksanderer opened 1 month ago

ksanderer commented 1 month ago

Is your feature request related to a problem? Please describe

It's frustrating that we can't use S3 directly in ingestion pipelines. We must first load a file from S3-compatible storage, base encode it, and then push it to the OpenSearch API.

It should be possible to use direct S3 links (e.g., s3://{bucket}/path_to_file.pdf) or provide an S3 key to ingest the file directly.

Describe the solution you'd like

Instead of fetching files from S3 and pushing them to OpenSearch using separate tools (e.g. python service):

import boto3
import base64
import requests

def fetch_file_from_s3(bucket_name, file_key):
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket_name, Key=file_key)
    return response['Body'].read()

def push_to_opensearch(index, doc_id, filename, title, data, pipeline):
    url = f"https://localhost:9200/{index}/_doc/{doc_id}?pipeline={pipeline}"
    headers = {'Content-Type': 'application/json'}
    payload = {
        "filename": filename,
        "title": title,
        "data": data
    }
    response = requests.put(url, json=payload, headers=headers)
    return response.status_code, response.text

def main(bucket_name, file_key, index, doc_id, pipeline):
    file_data = fetch_file_from_s3(bucket_name, file_key)
    encoded_data = base64.b64encode(file_data).decode('utf-8')

    status_code, response_text = push_to_opensearch(index, doc_id, file_key, 'Dummy PDF', encoded_data, pipeline)
    print(f"Status Code: {status_code}")
    print(f"Response: {response_text}")

if __name__ == "__main__":
    bucket_name = 'my_bucket'
    file_key = 'dummy.pdf'
    index = 'my_index'
    doc_id = '1'
    pipeline = 'file_attachment'

    main(bucket_name, file_key, index, doc_id, pipeline)

We can push files directly to OpenSearch:

// PUT https://localhost:9200/my_index/_doc/1?pipeline=s3_ingestion_pipeline
{
  "filename": "dummy.pdf",
  "title": "Dummy PDF",
  "s3_key": "s3://my_bucket/dummy.pdf"
}

The idea is to use a predefined S3 bucket, similar to snapshot repositories that can be configured to use S3 storage.

Related component

Other

Describe alternatives you've considered

Amazon offers an SQS-powered solution, but it's not available on other platforms like DigitalOcean OpenSearch.

We currently use a small Python service for this purpose. It receives an S3 key, fetches the file, and pushes the content to the OpenSearch cluster.

Additional context

No response

varunpareek690 commented 1 month ago

I would like to give it a try!

ksanderer commented 1 month ago

Since we’re already using S3 and OpenSearch, adding a new technology for file ingestion could make things more complicated than necessary. OpenSearch has a file ingestion API, and since S3 is widely used as a modern filesystem, it makes sense to take advantage of this.

By ingesting files directly from S3 URLs, we simplify the process, reduce the need for extra services, and make better use of what we already have in place. This approach is both efficient and scalable, without adding unnecessary complexity to our stack.

varunpareek690 commented 1 month ago

Hi @ksanderer ,

Thank you for the insights! I agree that minimizing complexity is crucial. While I understand the advantages of using the OpenSearch file ingestion API and ingesting files directly from S3 URLs, I still believe that exploring automation options, such as S3 Event triggers in AWS or integrating third-party services in non-AWS environments, could enhance our workflow. These approaches might streamline the process even further and help reduce manual intervention.

I’m particularly interested in how we can implement streaming directly to OpenSearch, as it could optimize our memory usage and overall efficiency.

dblock commented 4 weeks ago

[Catch All Triage - 1, 2]

varunpareek690 commented 3 weeks ago

Hi @dblock! What does this comment signify...? Can you please explain

dblock commented 3 weeks ago

Sorry for the cryptic comment :) Check out https://github.com/opensearch-project/.github/pull/233, does this help?