jina-ai / annlite

⚡ A fast embedded library for approximate nearest neighbor search
Apache License 2.0
216 stars 22 forks source link

Delete in executor does not works #209

Closed tommykoctur closed 1 year ago

tommykoctur commented 1 year ago

Hello,

I have found that delete in executor.py does not work ( or I am doing something wrong). This is minimal reproducible example:

## THIS IS EXACT COPY FROM ANNLITE EXECUTOR JUST DEFAULT N_DIM CHANGED AND DOCSRINGS REMOVED
import threading
import time
import traceback
import warnings
from threading import Thread
from typing import Dict, List, Optional, Tuple, Union

from docarray import Document, DocumentArray
from jina import Executor, requests
from jina.logging.logger import JinaLogger

INDEX_BATCH_SIZE = 1024

class AnnLiteIndexer(Executor):

    def __init__(
            self,
            n_dim: int = 5,
            metric: str = 'cosine',
            limit: int = 10,
            n_components: Optional[int] = None,
            match_args: Optional[Dict] = None,
            data_path: Optional[str] = None,
            ef_construction: Optional[int] = None,
            ef_search: Optional[int] = None,
            max_connection: Optional[int] = None,
            include_metadata: bool = True,
            index_access_paths: str = '@r',
            search_access_paths: str = '@r',
            columns: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None,
            dim: int = None,
            *args,
            **kwargs,
    ):

        super().__init__(*args, **kwargs)
        self.logger = JinaLogger(self.__class__.__name__)

        n_dim = n_dim or dim
        if not n_dim:
            raise ValueError('Please specify the dimension of the vectors to index!')

        self.n_components = n_components
        self.metric = metric
        self.match_args = match_args or {}
        self.include_metadata = include_metadata
        if limit:
            self.match_args.update({'limit': limit})

        self.index_access_paths = index_access_paths
        if 'index_traversal_paths' in kwargs:
            warnings.warn(
                f'`index_traversal_paths` is deprecated. Use `index_access_paths` instead.'
            )
            self.index_access_paths = kwargs['index_traversal_paths']

        self.search_access_paths = search_access_paths
        if 'search_traversal_paths' in kwargs:
            warnings.warn(
                f'`search_traversal_paths` is deprecated. Use `search_access_paths` instead.'
            )
            self.search_access_paths = kwargs['search_traversal_paths']

        self._data_buffer = DocumentArray()
        self._index_batch_size = INDEX_BATCH_SIZE
        self._max_length_queue = 2 * self._index_batch_size
        self._index_lock = threading.Lock()

        self.logger = JinaLogger(getattr(self.metas, 'name', self.__class__.__name__))

        if getattr(self.runtime_args, 'shards', 1) > 1 and data_path:
            raise ValueError(
                '`data_path` is not supported when shards > 1, please use `workspace` instead'
            )

        config = {
            'n_dim': n_dim,
            'n_components': n_components,
            'metric': metric,
            'ef_construction': ef_construction,
            'ef_search': ef_search,
            'max_connection': max_connection,
            'data_path': data_path or self.workspace or './workspace',
            'columns': columns,
        }
        self._index = DocumentArray(storage='annlite', config=config)

        # start indexing thread in background to group indexing requests
        # together and perform batch indexing at once
        self._start_index_loop()

    @requests(on='/index')
    def index(
            self, docs: Optional[DocumentArray] = None, parameters: dict = {}, **kwargs
    ):
        """Index new documents

        :param docs: the Documents to index
        :param parameters: dictionary with options for indexing
        Keys accepted:
            - 'access_paths': traversal paths on docs, e.g. '@r', '@c', '@r,c'
        """

        if not docs:
            return

        access_paths = parameters.get('access_paths', self.index_access_paths)
        flat_docs = docs[access_paths]
        if len(flat_docs) == 0:
            return

        while len(self._data_buffer) >= self._max_length_queue:
            time.sleep(0.001)

        with self._index_lock:
            self._data_buffer.extend(flat_docs)

    def _start_index_loop(self):
        """Start the indexing loop in background.

        This loop is responsible for batch indexing the documents in the buffer.
        """

        def _index_loop():
            try:
                while True:
                    # if the buffer is none, will break the loop
                    if self._data_buffer is None:
                        break

                    # if the buffer is empty, will wait for new documents to be added
                    if len(self._data_buffer) == 0:
                        time.sleep(0.1)  # sleep for 100ms
                        continue

                    # acquire the lock to prevent threading issues
                    with self._index_lock:
                        batch_docs = self._data_buffer.pop(
                            range(
                                self._index_batch_size
                                if len(self._data_buffer) > self._index_batch_size
                                else len(self._data_buffer)
                            )
                        )
                        self._index.extend(batch_docs)
                        self.logger.debug(f'indexing {len(batch_docs)} docs done...')
            except Exception as e:
                self.logger.error(traceback.format_exc())
                raise e

        self._index_thread = Thread(target=_index_loop, daemon=False)
        self._index_thread.start()

    @requests(on='/update')
    def update(
            self, docs: Optional[DocumentArray] = None, parameters: dict = {}, **kwargs
    ):
        """Update existing documents

        :param docs: the Documents to update
        :param parameters: dictionary with options for updating
        Keys accepted:
            - 'access_paths': traversal paths on docs, e.g. '@r', '@c', '@r,c'
            - 'raise_errors_on_not_found': if True, raise an error if a document is not found. Default is False.
        """

        if not docs:
            return

        access_paths = parameters.get('access_paths', self.index_access_paths)
        raise_errors_on_not_found = parameters.get('raise_errors_on_not_found', False)
        flat_docs = docs[access_paths]
        if len(flat_docs) == 0:
            return

        with self._index_lock:
            if len(self._data_buffer) > 0:
                raise RuntimeError(
                    f'Cannot update documents while the pending documents in the buffer are not indexed yet. '
                    'Please wait for the pending documents to be indexed.'
                )
            for doc in flat_docs:
                try:
                    self._index[doc.id] = doc
                except IndexError:
                    if raise_errors_on_not_found:
                        raise Exception(
                            f'The document (id={doc.id}) cannot be updated as'
                            f'it is not found in the index'
                        )
                    else:
                        self.logger.warning(
                            f'cannot update doc {doc.id} as it does not exist in storage'
                        )

    @requests(on='/delete')
    def delete(self, parameters: dict = {}, **kwargs):
        """Delete existing documents

        Delete entries from the index by id
        :param parameters: parameters to the request
        """

        delete_ids = parameters.get('ids', [])
        if len(delete_ids) == 0:
            return

        with self._index_lock:
            if len(self._data_buffer) > 0:
                raise RuntimeError(
                    f'Cannot delete documents while the pending documents in the buffer are not indexed yet. '
                    'Please wait for the pending documents to be indexed.'
                )

            del self._index[delete_ids]

    @requests(on='/search')
    def search(
            self, docs: Optional[DocumentArray] = None, parameters: dict = {}, **kwargs
    ):
        """Perform a vector similarity search and retrieve Document matches

        Search can be performed with candidate filtering. Filters are a triplet (column,operator,value).
        More than a filter can be applied during search. Therefore, conditions for a filter are specified as a list triplets.
        Each triplet contains:

        - column: Column used to filter.
        - operator: Binary operation between two values. Some supported operators include `['>','<','=','<=','>=']`.
        - value: value used to compare a candidate.

        :param docs: the Documents to search with
        :param parameters: dictionary for parameters for the search operation
        Keys accepted:
            - 'access_paths' (str): traversal paths on docs, e.g. '@r', '@c', '@r,c'
            - 'filter' (dict): the filtering conditions on document tags
            - 'limit' (int): nr of matches to get per Document
        """

        if not docs:
            return

        access_paths = parameters.get('access_paths', self.search_access_paths)
        flat_docs = docs[access_paths]
        match_args = (
            {**self.match_args, **parameters}
            if parameters is not None
            else self.match_args
        )

        with self._index_lock:
            # if len(self._data_buffer) > 0:
            #     raise RuntimeError(
            #         f'Cannot search documents while the pending documents in the buffer are not indexed yet. '
            #         'Please wait for the pending documents to be indexed.'
            #     )

            flat_docs.match(self._index, **match_args)

    @requests(on='/backup')
    def backup(self, parameters: Optional[Dict] = {}, **kwargs):
        """
        Backup data to local or remote.
        Use api of <class 'annlite.index.AnnLite'>

        Keys accepted:
            - 'target' (str): the name of indexer you want to backup as
        """

        target_name = parameters.get('target_name', None)
        token = parameters.get('token', None)
        if target_name:
            target_name = f'{target_name}_{self.runtime_args.shard_id}'
        with self._index_lock:
            if len(self._data_buffer) > 0:
                raise RuntimeError(
                    f'Cannot backup documents while the pending documents in the buffer are not indexed yet. '
                    'Please wait for the pending documents to be indexed.'
                )
            self._index._annlite.backup(target_name, token)

    @requests(on='/restore')
    def restore(self, parameters: Optional[Dict] = {}, **kwargs):
        """
        Restore data from local or remote.
        Use api of <class 'annlite.index.AnnLite'>
        """
        source_name = parameters.get('source_name', None)
        token = parameters.get('token', None)
        if source_name:
            source_name = f'{source_name}_{self.runtime_args.shard_id}'
        self._index._annlite.restore(source_name, token)

    @requests(on='/filter')
    def filter(self, parameters: Dict, **kwargs):
        """
        Query documents from the indexer by the filter `query` object in parameters. The `query` object must follow the
        specifications in the `find` method of `DocumentArray` using annlite: https://docarray.jina.ai/fundamentals/documentarray/find/#filter-with-query-operators
        :param parameters: Dictionary to define the `filter` that you want to use.
        """

        return self._index.find(parameters.get('filter', None))

    @requests(on='/fill_embedding')
    def fill_embedding(self, docs: DocumentArray, **kwargs):
        """
        retrieve embedding of Documents by id
        :param docs: DocumentArray to search with
        """

        for doc in docs:
            doc.embedding = self._index[doc.id].embedding

    @requests(on='/status')
    def status(self, **kwargs) -> DocumentArray:
        """Return the document containing status information about the indexer.

        The status will contain information on the total number of indexed and deleted
        documents, and on the number of (searchable) documents currently in the index.
        """

        status = Document(
            tags={
                'appending_size': len(self._data_buffer),
                'total_docs': len(self._index),
                'index_size': len(self._index),
            }
        )
        return DocumentArray([status])

    def flush(self):
        """Flush all the data in the buffer to the index"""
        while len(self._data_buffer) > 0:
            time.sleep(0.1)

    @requests(on='/clear')
    def clear(self, **kwargs):
        """Clear the index of all entries."""
        self.flush()

        with self._index_lock:
            self._data_buffer = None
            self._index_thread.join()

        self._data_buffer = DocumentArray()
        self._index.clear()

        self._start_index_loop()

    def close(self, **kwargs):
        """Close the index."""
        super().close()

        self.flush()

        # wait for the index thread to finish
        with self._index_lock:
            self._data_buffer = None
            self._index_thread.join()

        # WARNING: the commented code below hangs the close in pytest `pytest tests/test_*.py`
        # But don't know why. It works fine in `pytest tests/test_executor.py` and normal python execution
        del self._index

from jina import Flow

def index():
    f = Flow().add(uses=AnnLiteIndexer)

    da = DocumentArray([
        Document(id="d1", embedding=[1, 2, 3, 4, 5]),
        Document(id="d2", embedding=[2, 3, 4, 5, 6]),
        Document(id="d3", embedding=[3, 4, 5, 6, 7]),
    ])

    with f:
        f.post(on='/clear')
        f.post(on='/index', inputs=da)
        f.post(on='/dump')

def delete():
    f = Flow().add(uses=AnnLiteIndexer)

    delete_list = ["d1"]

    with f:
        f.post(on='/delete', parameters={'ids': delete_list})

if __name__ == "__main__":
    index()
    delete()

Thank you very much in advance!

JoanFM commented 1 year ago

Can you please show why do you think this does not work? Here there is no assertion or check

tommykoctur commented 1 year ago

Hi @JoanFM ,

sure.

python delete_bug_minimal_example.py
⠸ Waiting executor0 summary... ━━━━━━━━━━━━━╺━━━━━━━━━━━━━━━━━━━━━━━━━━ 1/3 0:00:002023-02-07 11:08:54.592 | INFO     | annlite.index:restore:664 - restore A
nnlite from local
2023-02-07 11:08:54.593 | INFO     | annlite.index:_rebuild_index_from_local:771 - Rebuild the indexer from scratch
2023-02-07 11:08:54.596 | INFO     | annlite.index:_rebuild_index_from_local:788 - Load the model from /home/username/.cache/jina/AnnLiteIndexer/0/parameters
-2b445f0495bd404037d10b26cf101add
──────────────────────────────────────────────────────────────── 🎉 Flow is ready to serve! ─────────────────────────────────────────────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│  ⛓     Protocol                    GRPC  │
│  🏠       Local           0.0.0.0:63669  │
│  🔒     Private       X.X.X.X:63669  │
│  🌍      Public   X.X.X.X:63669  │
╰──────────────────────────────────────────╯
⠸ Waiting executor0 summary... ━━━━━━━━━━━━━╺━━━━━━━━━━━━━━━━━━━━━━━━━━ 1/3 0:00:002023-02-07 11:08:56.598 | INFO     | annlite.index:restore:664 - restore A
nnlite from local
2023-02-07 11:08:56.599 | INFO     | annlite.index:_rebuild_index_from_local:771 - Rebuild the indexer from scratch
2023-02-07 11:08:56.601 | INFO     | annlite.index:_rebuild_index_from_local:788 - Load the model from /home/username/.cache/jina/AnnLiteIndexer/0/parameters
-2b445f0495bd404037d10b26cf101add
──────────────────────────────────────────────────────────────── 🎉 Flow is ready to serve! ─────────────────────────────────────────────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│  ⛓     Protocol                    GRPC  │
│  🏠       Local           0.0.0.0:64306  │
│  🔒     Private       X.X.X.X:64306  │
│  🌍      Public   X.X.X.X:64306  │
╰──────────────────────────────────────────╯
ERROR  executor0/rep-0@3915231 ValueError("'d1' is not in list")                                                                          [02/07/23 11:08:57]
        add "--quiet-error" to suppress the exception details                                                                                                
       Traceback (most recent call last):                                                                                                                    
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/runtimes/worker/__init__.py", line 222,                      
       in process_data                                                                                                                                       
           result = await self._request_handler.handle(
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/runtimes/worker/request_handling.py",
       line 291, in handle
           return_data = await self._executor.__acall__(
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py", line 352, in
       __acall__
           return await self.__acall_endpoint__(req_endpoint, **kwargs)
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py", line 408, in
       __acall_endpoint__
           return await exec_func(
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py", line 369, in
       exec_func
           return await get_or_reuse_loop().run_in_executor(None, functools.partial(func, self,
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/concurrent/futures/thread.py", line 58, in run
           result = self.fn(*self.args, **self.kwargs)
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/decorators.py", line 182, in
       arg_wrapper
           return fn(executor_instance, *args, **kwargs)
         File "/home/username/jina-multi-sentence-sse/delete_bug_minimal_example.py", line 217, in delete
           del self._index[delete_ids]
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/mixins/delitem.py", line 70, in
       __delitem__
           del self[t]
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/mixins/delitem.py", line 31, in
       __delitem__
           self._del_doc(index)
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/storage/base/getsetdel.py", line 106,
       in _del_doc
           self._offset2ids.delete_by_id(_id)
         File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/storage/base/helper.py", line 31, in
       delete_by_id
           del self.ids[self.ids.index(_id)]
       ValueError: 'd1' is not in list
Traceback (most recent call last):
  File "/home/username/jina-multi-sentence-sse/delete_bug_minimal_example.py", line 396, in <module>
    delete()
  File "/home/username/jina-multi-sentence-sse/delete_bug_minimal_example.py", line 391, in delete
    f.post(on='/delete', parameters={'ids': delete_list})
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/clients/mixin.py", line 273, in post
    return run_async(
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/helper.py", line 1342, in run_async
    return asyncio.run(func(*args, **kwargs))
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/clients/mixin.py", line 264, in _get_results
    async for resp in c._get_results(*args, **kwargs):
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/clients/base/grpc.py", line 140, in _get_results
    callback_exec(
  File "/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/clients/helper.py", line 81, in callback_exec
    raise BadServer(response.header)
jina.excepts.BadServer: request_id: "54b0c70b96ad4ea684a5e1d84721bb95"
status {
  code: ERROR
  description: "ValueError(\"\'d1\' is not in list\")"
  exception {
    name: "ValueError"
    args: "\'d1\' is not in list"
    stacks: "Traceback (most recent call last):\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/runtimes/worker/__init__.py\", line 222, in process_data\n    result = await self._request_handler.handle(\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/runtimes/worker/request_handling.py\", line 291, in handle\n    return_data = await self._executor.__acall__(\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py\", line 352, in __acall__\n    r
eturn await self.__acall_endpoint__(req_endpoint, **kwargs)\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py\", line 408, in __acall_endpoint__\n    return await exec_func(\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/__init__.py\", line 369, in exec_func\n    return await get_or_reuse_loop().run_in_executor(None, functools.partial(func, self,\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/concurrent/futures/thread.py\", line 58, in run\n    result = self.fn(*self.args, **self.kwargs)\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/jina/serve/executors/decorators.py\", line 182, in arg_wrapper\n    return fn(executor_instance, *args, **kwargs)\n"
    stacks: "  File \"/home/username/jina-multi-sentence-sse/delete_bug_minimal_example.py\", line 217, in delete\n    del self._index[delete_ids]\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/mixins/delitem.py\", line 70, in __delitem__\n    del self[t]\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/mixins/delitem.py\", line 31, in __delitem__\n    self._del_doc(index)\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/storage/base/getsetdel.py\", line 106, in _del_doc\n    self._offset2ids.delete_by_id(_id)\n"
    stacks: "  File \"/home/username/miniconda3/envs/jina-test/lib/python3.10/site-packages/docarray/array/storage/base/helper.py\", line 31, in delete_by_id\n    del self.ids[self.ids.index(_id)]\n"
    stacks: "ValueError: \'d1\' is not in list\n"
    executor: "AnnLiteIndexer"
  }
}
exec_endpoint: "/delete"
target_executor: ""
jemmyshin commented 1 year ago

Thanks for reporting, we will look into it.

JoanFM commented 1 year ago

Hey @tommykoctur , I believe the issue is that the index was not yet rebuilt. I think you can skip the Error and resend the delete post request later

tommykoctur commented 1 year ago

Thanks @JoanFM for suggestion. I tried code below. I hope that can simulate rebuilding the index. But the same error at the end.

from jina import Flow
import numpy as np

da = DocumentArray([
    Document(id="d1", embedding=np.array([1, 2, 3, 4, 5])),
    Document(id="d2", embedding=np.array([2, 3, 4, 5, 6])),
    Document(id="d3", embedding=np.array([3, 4, 5, 6, 7])),
])

def index():
    f = Flow().add(uses=AnnLiteIndexer)

    with f:
        f.post(on='/clear')
        f.post(on='/index', inputs=da)
        f.post(on='/dump')

def search():

    f = Flow().add(uses=AnnLiteIndexer)

    with f:
        f.post(on='/search', inputs=DocumentArray(da[0]))

def delete():
    f = Flow().add(uses=AnnLiteIndexer)

    delete_list = ["d1"]

    with f:
        f.post(on='/delete', parameters={'ids': delete_list})

if __name__ == "__main__":
    index()
    search()
    delete()
tommykoctur commented 1 year ago

@jemmyshin @JoanFM I forgot to add one more hint. offsets2ids sqlite table is empty, shouldn't it has some entries ? or is it obsolete for annlite ?

JoanFM commented 1 year ago

We found the problem. There is a bug on our side, but offset2ID is not important to ANNLite, it should work if you pass list_like=False in the config of the DocumentArrayAnnLite.

jemmyshin commented 1 year ago

Also, please use /backup instead of /dump for AnnliteIndexer @tommykoctur

tommykoctur commented 1 year ago

@jemmyshin @JoanFM thank you for suggestions. I want to let you know that I asked my colleague to test it (i didn't had much time) and he said that this workaround with list_like=False and using /backup didn't solved that issue.

JoanFM commented 1 year ago

We released a new version of ANNLite, you should be able to use the new version

tommykoctur commented 1 year ago

Hi @JoanFM, @jemmyshin ,

we can confirm that annlite=0.5.6 and 0.5.7 is working, but there is needed to put list_like=False to annlite config. Thanks for your support.

BTW, why is taking so long to get new version (0.5.7) from your git to pipy ... can this process be speed up somehow ?