developmentseed / bioacoustics-api

Google Bioacustics API that runs the backend for A2O Search
https://devseed.com/api-docs/?url=https://api.search.acousticobservatory.org/api/v1/openapi
MIT License
1 stars 0 forks source link

Ingest embedding to Milvus #4

Open geohacker opened 1 year ago

geohacker commented 1 year ago

We should figure out an approach to ingest embeddings to Milvus.

cc @leothomas @willemarcel @sunu

sdenton4 commented 1 year ago

I've been doing a bit of work on our embed.py script (and adding better documentation), which creates the TFRecords of embeddings. It'll be interesting to see if we can add indexing as 'just one more stage' to the Beam pipeline.

https://github.com/google-research/chirp/tree/main/chirp/inference#inference-pipeline

leothomas commented 1 year ago

Just adding a quick overview of the required steps to ingest embeddings into a milvus instance:

  1. Instantiate a connection to a milvus instance:

    HOST = "127.0.0.1"
    PORT = default_server.listen_port
    connections.connect(host=HOST, port=PORT)

    In the case of a GCP hosted Milvus instance, the HOST address would obviously be the IP of the machine

  2. Define a collection + schema: A collection is all data that can queried together (so in our case all of our data would go into a single collection). The schemas is the field names and types for the embedding field + metadata fields:

    
    # define collection fields
    id_field = FieldSchema(
    name="id", 
    dtype=DataType.INT64, 
    descrition="primary field", 
    is_primary=True, 
    auto_id=True
    )

embedding_field = FieldSchema( name="embedding", dtype=DataType.FLOAT_VECTOR, description="Float32 vector with dim 1280", dim=1280, is_primary=False ) ...

chema = CollectionSchema( fields=[ id_field, embedding_field, ... ], description="Collection for searching A20 bird embeddings" ) collection = Collection( name="a2o", data=None, schema=schema,

Set TTL to 0 to disable

properties={"collection.ttl.seconds": 0}

)

(data set to `None` since we haven't started ingesting data)

4. Prep data:
Data should be a 2D list of python types (`str`, `int`, `numpy.float` etc), where the order of each sublist corresponds to the order of the fields in the schema definition (OMITTING the `id` field, if `auto_id` is set to `True`)

5. Ingest data: 
Preferably in batches, calling `collection.flush()` after each insert operation: 
```python
collection.insert(
        [
            embeddings,  # List of numpy arrays
            filenames, # List of str
            ....
         ]
)
collection.flush()
  1. Train index: The params for training the index vary from index to index, by the most common index, IVF, requires an nlist param:

    index_params = {
    "index_type": "IVF_FLAT",
    "params":{"nlist":1024},
    "metric_type": "L2"
    }
    collection.create_index("embedding", index_params)

    This takes up to ~20 minutes for the 0.1% embeddings sample subset when the milvus instance is running locally on my machine, hopefully much less time when we can throw some more compute power at it. The nlist parameter is the number of centroids to use when clustering the dataset.

  2. Search the collection:

    # load the collection (or index) into memory: 
    collection.load()
    search_param = {
    "data": search_vectors,
    "anns_field": "embedding",
    "param": {"metric_type": "L2", "params": {"nprobe": 16}},
    "limit": 10,
    "expr": "subsite_name == \"Wet-A\"", 
    "output_fields": ["site_name", "subsite_name", "file_timestamp"]
    }
    results = collection.search(**search_param)
    • expr allows for inplace metadata filtering (hybrid searching)
    • output_fields defines the metadata fields which should be included in the search output.

You can use Milvus Lite to run a barebones Milvus instance on your local machine in order to test out the ingestion logic (Milvus Lite is guaranteed to have the same API as the hosted milvus instance, so any code that works in Milvus Lite will also work with hosted Milvus).

We are working on getting a cloud based Milvus instance up and running to start testing scaling configurations, memory consumption, etc.

Last (but not least), fun fact: Milvus is a the genus for the bird family of kites (ref)

geohacker commented 1 year ago

@sdenton4 I like the idea of making this ingestion a step of the embed.py script. In an ideal scenario, I think TFRecord and ingestion could all be part of the cluster we are building and run as jobs. So could make sense to put these things there.

I must admit that I've no prior experience with Beam but I'd imagine it should relatively straightforward to containerise and run as part of the cluster later on.

@leothomas thanks so much for that outline. We ran through this quickly in the call today.

@sdenton4 for next steps, do you mind taking a stab at integrating the ingestion into embed.py? Thank you