towhee-io / examples

Analyze the unstructured data with Towhee, such as reverse image search, reverse video search, audio classification, question and answer systems, molecular search, etc.
Apache License 2.0
447 stars 112 forks source link

Collection returns empty in question answering engine. #231

Open lydacious opened 1 year ago

lydacious commented 1 year ago

Hello, I am following the guide on creating a question-answering engine. I have done everything the same as the guide, but the collection stays empty for some reason. Before pasting the code here, this is the list of what I've done so far:

Code:


import pandas as pd
import numpy as np
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from towhee import pipe, ops
from towhee.datacollection import DataCollection

# Read data from CSV
df = pd.read_csv('question_answer.csv')
id_answer = df.set_index('id')['answer'].to_dict()

# Milvus connection
connections.connect(host='127.0.0.1', port='19530')

# Milvus collection creation function
def create_milvus_collection(collection_name, dim):
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)

    fields = [
        FieldSchema(name='id', dtype=DataType.VARCHAR, description='ids', max_length=500, is_primary=True, auto_id=False),
        FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='embedding vectors', dim=dim)
    ]
    schema = CollectionSchema(fields=fields, description='reverse image search')
    collection = Collection(name=collection_name, schema=schema)

    # Create IVF_FLAT index for collection
    index_params = {
        'metric_type': 'L2',
        'index_type': 'IVF_FLAT',
        'params': {'nlist': 2048}
    }
    collection.create_index(field_name='embedding', index_params=index_params)
    return collection

# Create Milvus collection
collection = create_milvus_collection('question_answer', 768)

insert_pipe = (
    pipe.input('id', 'question', 'answer')
        .map('question', 'vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base'))
        .map('vec', 'vec', lambda x: x / np.linalg.norm(x, axis=0))
        .map(('id', 'vec'), 'insert_status', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='question_answer'))
        .output()
)

import csv
with open('question_answer.csv', encoding='utf-8') as f:
    reader = csv.reader(f)
    next(reader)
    for row in reader:
        insert_pipe(*row)

print(collection.is_empty)

print('Total number of inserted data is {}.'.format(collection.num_entities))```
Vokinloksar commented 1 year ago

same here, suspect related to version issue of certain package.

junjiejiangjjj commented 1 year ago

You need to call the collection.load interface after insert. https://milvus.io/docs/load_collection.md

AneetaNiazi commented 6 months ago

instead of inserting each row separately, I inserted all rows as a batch and this error was resolved:

%%time
from towhee import pipe, ops
import numpy as np
from towhee.datacollection import DataCollection

insert_pipe = (
    pipe.input('id', 'question', 'answer')
        .map('question', 'vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base'))
        .map('vec', 'vec', lambda x: x / np.linalg.norm(x, axis=0))
        .map(('id', 'vec'), 'insert_status', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='question_answer'))
        .output()
)

import csv
with open('question_answer.csv', encoding='utf-8') as f:
    reader = csv.reader(f)
    next(reader)
    allRows=[]
    for row in reader:
        allRows.append(row)
    res=insert_pipe.batch(allRows)