opensearch-project / opensearch-hadoop

Apache License 2.0
29 stars 23 forks source link

[FEATURE] Add support for handling of vector fields #424

Open dgoldenberg-ias opened 4 months ago

dgoldenberg-ias commented 4 months ago

Is your feature request related to a problem?

I've attempted to load index data including a vector field into a Spark dataframe and vector field(s) do not get loaded while other fields do.

What solution would you like?

Please add support for loading of/handling of vector fields in OpenSearch so they can be loaded into a dataframe.

What alternatives have you considered?

My workaround is to use the scan/scroll API to fetch the data and then create the dataframe in my code from that data but it would be great to have them supported in opensearch-hadoop 'out of the box'.

Do you have any additional context?

Please take a look at the code samples below.

Scenario 1: The Successful Working Scenario

from opensearchpy import OpenSearch, RequestsHttpConnection

index_name = 'sample_index'
user = 'my_user_123'
pwd = 'the-pwd-value-here'

def get_client():
    client = OpenSearch(
        hosts=[{
            "host": "hostnamexxx.us-east-1.es.amazonaws.com",
            "port": 443,
        }],
        http_auth=(user, pwd),
        connection_class=RequestsHttpConnection,
        use_ssl=True,
        verify_certs=True,
        timeout=30,
    )
    if not client.ping():
        raise Exception("Unable to connect to the Domain")
    return client

# Index settings and mappings
mapping = {
    'settings': {
        'number_of_shards': 2,
        'number_of_replicas': 0
    },
    'mappings': {
        'properties': {
            'doc_id': {'type': 'keyword'},
            'value': {'type': 'text'}
        }
    }
}

client = get_client()
if client.indices.exists(index=index_name):
    print(f"The index '{index_name}' already exists; not creating.")
else:
    print(f"The index '{index_name}' does not yet exist; creating...")
    response = client.indices.create(index_name, body=mapping)
    print(f"Created the index '{index_name}'.")

# Sample documents to index
sample_documents = [
    {'doc_id': '1', 'value': 'Sample document 1'},
    {'doc_id': '2', 'value': 'Sample document 2'},
    {'doc_id': '3', 'value': 'Sample document 3'},
    {'doc_id': '4', 'value': 'Sample document 4'},
    {'doc_id': '5', 'value': 'Sample document 5'},
    {'doc_id': '6', 'value': 'Sample document 6'},
    {'doc_id': '7', 'value': 'Sample document 7'},
    {'doc_id': '8', 'value': 'Sample document 8'},
    {'doc_id': '9', 'value': 'Sample document 9'},
    {'doc_id': '10', 'value': 'Sample document 10'}
]

# Index the sample documents
for doc in sample_documents:
    client.index(index=index_name, id=doc['doc_id'], body=doc)

client.indices.refresh(index=index_name)

# Verify that 10 documents got indexed
result = client.search(index=index_name)
num_docs_indexed = result['hits']['total']['value']
print(f"Number of documents indexed: {num_docs_indexed}")

client.close()

Scenario 1: A Successful Load of all Fields into a Dataframe

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("opensearch-hadoop") \
    .getOrCreate()

df = spark.read.format("opensearch") \
    .option("opensearch.nodes", "https://hostnamexxx.us-east-1.es.amazonaws.com") \
    .option("opensearch.port", "443") \
    .option("opensearch.resource", index_name) \
    .option("opensearch.nodes.wan.only", "true") \
    .option("opensearch.net.ssl", "true") \
    .option("opensearch.net.http.auth.user", user) \
    .option("opensearch.net.http.auth.pass", pwd) \
    .load() \
    .select("doc_id", "value")

df.show(n=20, truncate=False)

Scenario 1: Successful Result

+------+------------------+
|doc_id|value             |
+------+------------------+
|1     |Sample document 1 |
|2     |Sample document 2 |
|3     |Sample document 3 |
|5     |Sample document 5 |
|6     |Sample document 6 |
|7     |Sample document 7 |
|9     |Sample document 9 |
|10    |Sample document 10|
|4     |Sample document 4 |
|8     |Sample document 8 |
+------+------------------+

Scenario 2: Create an Index with a Vector Field

index_name = "sample-index-with-vec-field"

mapping = {
    'settings': {
        'index': {
            'knn': True,  # Enable k-NN search for this index
            "refresh_interval": "30s",
            "knn.algo_param.ef_search": 100
        }
    },
    'mappings': {
        'properties': {
            'vector': {  # k-NN vector field
                'type': 'knn_vector',
                'dimension': 100, # Dimension of the vector
                'method': {
                    'name': 'hnsw',
                    'space_type': 'l2',
                    'engine': "nmslib",
                    "parameters": {
                        "ef_construction": 256,
                        "m": 16,
                    },
                },
            },
            'doc_id': {
                'type': 'keyword'
            }
        }
    }
}

client = get_client()
if client.indices.exists(index=index_name):
    print(f"The index '{index_name}' already exists; not creating.")
else:
    print(f"The index '{index_name}' does not yet exist; creating...")
    response = client.indices.create(index_name, body=mapping)
    print(f"Created the index '{index_name}'.")
client.close()

Scenario 2: Populate the Index with Vectors

import random
import uuid
import json

NUM_DOCS = 100
BATCH_SIZE = 10
NUM_DIMS = 100

def create_bulk_index_docs_request(documents):
    bulk_request_body = ""
    for doc in documents:
        # The format is a repeated sequence of these:
        # action_and_meta_data\n
        # optional_source\n
        bulk_request_body += f'{{"index": {{"_index": "{index_name}", "_id": "{doc["doc_id"]}"}}}}\n'
        bulk_request_body += f'{json.dumps(doc)}\n'
    return bulk_request_body

def index_batch(client, index_name, num_dims, batch_size):
    documents = []
    for _ in range(batch_size):
        random_vector = [random.uniform(-1.0, 1.0) for _ in range(num_dims)]
        doc_id = str(uuid.uuid4())
        document = {
            "vector": random_vector,
            "doc_id": doc_id
        }
        documents.append(document)

    bulk_request_body = create_bulk_index_docs_request(documents)
    client.bulk(index=index_name, body=bulk_request_body)

num_batches = NUM_DOCS // BATCH_SIZE
remaining_docs = NUM_DOCS % BATCH_SIZE

client = get_client()

# Index full batches
for _ in range(num_batches):
    index_batch(client, index_name, NUM_DIMS, BATCH_SIZE)

# Index remaining documents
if remaining_docs > 0:
    index_batch(client, index_name, NUM_DIMS, remaining_docs)

client.indices.refresh(index=index_name)

# Verify that 10 documents got indexed
result = client.search(index=index_name)
num_docs_indexed = result['hits']['total']['value']
print(f"Number of documents indexed: {num_docs_indexed}")

client.close()

Result: Number of documents indexed: 100

Scenario 2: Failing to Load the Vector Field

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("opensearch-hadoop") \
    .getOrCreate()

# .option("opensearch.nodes.wan.only", "true") \
df = spark.read.format("opensearch") \
    .option("opensearch.nodes", "https://hostnamexxx.us-east-1.es.amazonaws.com") \
    .option("opensearch.port", "443") \
    .option("opensearch.resource", index_name) \
    .option("opensearch.nodes.wan.only", "true") \
    .option("opensearch.net.ssl", "true") \
    .option("opensearch.net.http.auth.user", user) \
    .option("opensearch.net.http.auth.pass", pwd) \
    .load() \
    .select("doc_id", "vector")

df.show(n=20, truncate=False)

Scenario 2: Resulting in an Error

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `vector` cannot be resolved. Did you mean one of the following? [`doc_id`].;
'Project [doc_id#19, 'vector]
+- Relation [doc_id#19] OpenSearchRelation(Map(opensearch.net.ssl -> true, opensearch.nodes -> https://hostnamexxx.us-east-1.es.amazonaws.com, opensearch.nodes.wan.only -> true, opensearch.net.http.auth.pass -> ..., opensearch.net.http.auth.user -> ..., opensearch.resource -> sample-index-with-vec-field, opensearch.port -> 443),org.apache.spark.sql.SQLContext@68f08922,None)

Stack trace:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
File <command-4260897524703214>, line 17
      3 spark = SparkSession.builder \
      4     .appName("opensearch-hadoop") \
      5     .getOrCreate()
      7 # .option("opensearch.nodes.wan.only", "true") \
      8 df = spark.read.format("opensearch") \
      9     .option("opensearch.nodes", "https:hostnamexxx.us-east-1.es.amazonaws.com") \
     10     .option("opensearch.port", "443") \
     11     .option("opensearch.resource", index_name) \
     12     .option("opensearch.nodes.wan.only", "true") \
     13     .option("opensearch.net.ssl", "true") \
     14     .option("opensearch.net.http.auth.user", user) \
     15     .option("opensearch.net.http.auth.pass", pwd) \
     16     .load() \
---> 17     .select("doc_id", "vector")
     19 df.show(n=20, truncate=False)

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/dataframe.py:3168, in DataFrame.select(self, *cols)
   3123 def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   3124     """Projects a set of expressions and returns a new :class:`DataFrame`.
   3125 
   3126     .. versionadded:: 1.3.0
   (...)
   3166     +-----+---+
   3167     """
-> 3168     jdf = self._jdf.select(self._jcols(*cols))
   3169     return DataFrame(jdf, self.sparkSession)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:194, in capture_sql_exception.<locals>.deco(*a, **kw)
    190 converted = convert_exception(e.java_exception)
    191 if not isinstance(converted, UnknownException):
    192     # Hide where the exception came from that shows a non-Pythonic
    193     # JVM exception message.
--> 194     raise converted from None
    195 else:
    196     raise

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `vector` cannot be resolved. Did you mean one of the following? [`doc_id`].;
'Project [doc_id#19, 'vector]
+- Relation [doc_id#19] OpenSearchRelation(Map(opensearch.net.ssl -> true, opensearch.nodes -> https://hostnamexxx.us-east-1.es.amazonaws.com, opensearch.nodes.wan.only -> true, opensearch.net.http.auth.pass -> ..., opensearch.net.http.auth.user -> ..., opensearch.resource -> sample-index-with-vec-field, opensearch.port -> 443),org.apache.spark.sql.SQLContext@68f08922,None)
dgoldenberg-ias commented 4 months ago

Version info:

dgoldenberg-ias commented 3 months ago

Hi, any word on this one?

wbeckler commented 3 months ago

It's a gap that nobody has picked up. If you're willing to take a stab at it, I don't think there would be any objection.

dgoldenberg-ias commented 2 months ago

Hi @wbeckler, could you provide any pointers as to where in opensearch-hadoop this could be plugged in? Might we need to add some code in opensearch itself for this?

Xtansia commented 2 months ago

I don't personally have any experience adding support for a new field type, however this https://github.com/opensearch-project/opensearch-hadoop/commit/c651e79c620f5d55fb91e908a7ba54b978632502 looks to be the most recent commit adding a new field type so may be a good jumping off point to at least figure out which files are likely to need modifying. Do note however that that commit is from before the fork from elasticsearch so the paths will be a bit different, but the file names should generally be the same, other than elasticsearch -> opensearch and es -> os.

dgoldenberg-ias commented 2 months ago

Thanks @Xtansia, the issue is that it's not clear what the type should be. Presumably, knn_vector? But I don't see it being handled in the stock opensearch or elastic code; wonder if that's done in a custom module or some such. But then, would following c651e79 just work? I don't grok the architecture of this so can't tell how to approach.

Xtansia commented 2 months ago

It would be knn_vector yes, the mapping is defined in the k-NN plugin: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/mapper/KNNVectorFieldMapper.java