opensearch-project / opensearch-py

Python Client for OpenSearch
https://opensearch.org/docs/latest/clients/python/
Apache License 2.0
357 stars 176 forks source link

OpenSearch indexing script disconnects from server: ConnectionRefusedError #770

Open lijie123bes opened 4 months ago

lijie123bes commented 4 months ago

Describe the bug

I am encountering a ConnectionRefusedError while running an OpenSearch indexing script. Initially, the script worked without issues, indexing approximately 200 files, each containing around 30,000 documents.

However, now it fails with the following error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 264, in perform_request
    response = self.pool.urlopen(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 847, in urlopen
    retries = retries.increment(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 445, in increment
    raise reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/util/util.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 793, in urlopen
    response = self._make_request(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 491, in _make_request
    raise new_e
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 467, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 1099, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 616, in connect
    self.sock = sock = self._new_conn()
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 213, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "indexing.py", line 162, in <module>
    create_index(index_name, path, model, batch_size=100)
  File "indexing.py", line 139, in create_index
    client.bulk(body=batch, refresh=True)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/utils.py", line 181, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/client/__init__.py", line 462, in bulk
    return self.transport.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 446, in perform_request
    raise e
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/transport.py", line 409, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/usr/local/lib/python3.8/dist-packages/opensearchpy/connection/http_urllib3.py", line 279, in perform_request
    raise ConnectionError("N/A", str(e), e)
opensearchpy.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fb798541430>: Failed to establish a new connection: [Errno 111] Connection refused)

When encountering this error, I restart OpenSearch and resume indexing from the exact file where the process stopped. However, upon resuming, the same error occurs again.

Related component

Clients

To Reproduce

The script reads JSON files from a directory and indexes them into an OpenSearch cluster. It uses the opensearchpy library to interact with OpenSearch and the sentence-transformers library for document embeddings. This is the code:

import pandas as pd
import os
import json
import torch
import time
from opensearchpy import OpenSearch
from sentence_transformers import SentenceTransformer

path = "../outputNJSONextracted"  # Directory containing your JSON files
model_card = 'sentence-transformers/msmarco-distilbert-base-tas-b'

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device {device}")

host = '127.0.0.1' #host = '54.93.99.186' 
port = 9200
auth = ('admin','IVIngi2024!') #('admin', 'admin')  

client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    timeout=30, 
    max_retries=10
)
print("Connection opened...")

index_name = 'medline-faiss-hnsw-3' # change the index name

index_body = {
 "settings": {
    "index": {
     "knn": "true",
     "refresh_interval" : -1,
     #default_pipeline": "medline-ingest-pipeline",  # embedding in script
     "number_of_shards": 5,
     "number_of_replicas": 0
    }
  },
  "mappings": {
    "properties": {
         "embedding_abstract": {
                        "type": "knn_vector",
                        "dimension": 768,
                        "method":{
                            "engine":"faiss",
                            "name": "hnsw",
                            "space_type": "innerproduct"
                        }
                       },
          "title":{"type":"text"},
          "abstract":{"type":"text"},
          "pmid":{"type":"keyword"},
          "journal":{"type":"text"},
          "pubdate":{"type":"date"},
          "authors":{"type": "text"}
    }
  }
}

response = client.indices.create(index_name, body=index_body)
print(response)

def create_index(index_name, directory_path, model, batch_size=100):
    j = 0
    documents = set()

    files_number = 0
    for filename in sorted(os.listdir(directory_path)):
        start_time = time.time()
        if filename.endswith(".json"):
            print(f"Starting indexing {filename} ...")
            # Construct the full file path
            file_path = os.path.join(directory_path, filename)
            # Read the JSON file
            with open(file_path, 'r') as file:
                # Initialize an empty list to store dictionaries
                dictionaries = []

                # Read the file line by line
                for line in file:
                    # Parse each line as JSON and append it to the list
                    dictionaries.append(json.loads(line))
            # Create a DataFrame
            df = pd.DataFrame(dictionaries)
            # Select only the required columns
            df = df[['pmid', 'title', 'abstract', 'journal', 'authors', 'pubdate']]
            # Output the file name
            batch = []
            for i, row in df.iterrows(): 
                pmid = row["pmid"]
                if pmid in documents:
                    continue
                else:
                    documents.add(pmid)
                    embedding = model.encode(row["abstract"])
                    doc = {
                        "pmid": pmid,
                        "abstract": row["abstract"],
                        "title": row["title"],
                        "authors": row['authors'],
                        "journal": row['journal'],
                        "pubdate": row['pubdate'],
                        "embedding_abstract": embedding
                        }
                    batch.append({"index": {"_index": index_name, "_id": pmid}})
                    batch.append(doc)
                    j += 1
                if len(batch) >= batch_size*2:
                    client.bulk(body=batch, refresh=True)
                    batch = []

            if batch:
                client.bulk(body=batch, refresh=True)
                print(f"Indexed remaining documents")

            files_number += 1
            print(f"Processed file: {filename} in {time.time()-start_time}")
            print("Number of currently documents indexed ",j)
            if files_number % 100 == 0:
                print("-"*50)
                print(f"Files indexed = {files_number}")
                print()

    print("Total documents inserted = ", j)

model = SentenceTransformer(model_card)
model.to(device)
print("Creating indexing...")
start = time.time()
create_index(index_name, path, model, batch_size=100)
print(f"Time neeeded {time.time() - start}")

Troubleshooting Steps Taken:

Increased heap size in jvm.options file to 8GB (the max that I can). Attempted to reduce batch size to mitigate potential HTTP request size issues. Despite these efforts, the script continues to encounter connection issues.

I suspect that the problem may be related to sending large HTTP requests or some configuration issue with the OpenSearch server. However, I'm unsure how to proceed in diagnosing and resolving the issue.

Any insights or suggestions would be greatly appreciated. Thank you!

Expected behavior

namal Connection

Additional Details

Plugins Please list all plugins currently enabled.

Screenshots If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

Additional context Add any other context about the problem here.

peternied commented 4 months ago

[Triage - attendees 1 2 3] @lijie123bes Thanks for creating this issue with full reproduction steps, moving this to the python client repo for further investigation.

@opensearch-project/admin could you please transfer this issue to the https://github.com/opensearch-project/opensearch-py repository?

dblock commented 4 months ago

@lijie123bes Let's narrow this down to a client vs. server problem.

Do you have errors in the server-side log, meaning is the server actually refusing connections because of a circuit breaker or something like this?