SuperDuperDB / superduperdb

🔮 SuperDuperDB: Bring AI to your database! Build, deploy and manage any AI application directly with your existing data infrastructure, without moving your data. Including streaming inference, scalable model training and vector search.
https://superduperdb.com
Apache License 2.0
4.55k stars 445 forks source link

[BUG]: Activation of Downstream Listener Missed in Ibis #2106

Closed jieguangzhou closed 1 month ago

jieguangzhou commented 1 month ago

Contact Details [Optional]

No response

System Information

main

What happened?

When using Ibis, the upstream model results were saved, but the activation of the downstream listener was missed.

Steps to reproduce

from superduperdb import superduper

try:
    import os

    os.system('rm my_db.db')
except:
    pass

db = superduper('sqlite://my_db.db')

get_ipython().system(
    'curl -O https://superduperdb-public-demo.s3.amazonaws.com/text.json'
)
import json

with open('text.json', 'r') as f:
    data = json.load(f)
sample_datapoint = "What is mongodb?"

datas = [{'x': d} for d in data]

datatype = None

datatype = 'str'

from superduperdb import DataType

if datatype and isinstance(datatype, DataType):
    db.apply(datatype)

from superduperdb import Schema
from superduperdb.components.table import Table

schema = Schema(identifier="schema", fields={"x": datatype})
table_or_collection = Table('documents', schema=schema)
db.apply(table_or_collection)

from superduperdb import Document

table_or_collection = db['documents']

ids = db.execute(table_or_collection.insert(datas[:1]))
select = table_or_collection.select()

from superduperdb import objectmodel

CHUNK_SIZE = 200

@objectmodel(flatten=True, model_update_kwargs={'document_embedded': False})
def chunker(text):
    text = text.split()
    chunks = [
        ' '.join(text[i : i + CHUNK_SIZE]) for i in range(0, len(text), CHUNK_SIZE)
    ]
    return chunks

from superduperdb import Listener

upstream_listener = Listener(
    model=chunker,
    select=select,
    key='x',
    uuid="chunk",
)

db.apply(upstream_listener)

compatible_model = None

from superduperdb.ext.sentence_transformers import SentenceTransformer

model = SentenceTransformer(
    identifier='all-MiniLM-L6-v2',
    postprocess=lambda x: x.tolist(),
)

indexing_key = upstream_listener.outputs_key
select = upstream_listener.outputs_select

compatible_key = None
if compatible_model:
    compatible_key = 'y'

vector_index_name = 'my-vector-index'

from superduperdb import Listener, VectorIndex

jobs, _ = db.add(
    VectorIndex(
        vector_index_name,
        indexing_listener=Listener(
            key=indexing_key,  # the `Document` key `model` should ingest to create embedding
            select=select,  # a `Select` query telling which data to search over
            model=model,  # a `_Predictor` how to convert data to embeddings
            uuid='embedding',
        ),
    )
)

query_table_or_collection = select.table_or_collection

from superduperdb import DataType, Document

def get_sample_item(key, sample_datapoint, datatype=None):
    if not isinstance(datatype, DataType):
        item = Document({key: sample_datapoint})
    else:
        item = Document({key: datatype(sample_datapoint)})

    return item

if compatible_key:
    item = get_sample_item(compatible_key, sample_datapoint, None)
else:
    item = get_sample_item(indexing_key, sample_datapoint, datatype=datatype)

select = query_table_or_collection.like(
    item, vector_index=vector_index_name, n=10
).select()

results = db.execute(select)

from IPython.display import Markdown, display

def visualize(item, source):
    display(Markdown(item))

def show(results, output_key, get_original_callable=None):
    for result in results:
        source = None
        if '_source' in result:
            source = get_original_callable(result['_source'])
        visualize(result[output_key], source)

def get_original(_source):
    return db.execute(table_or_collection.find_one({'_id': _source}))

if upstream_listener:
    visualization_key = upstream_listener.outputs
else:
    visualization_key = indexing_key

def get_original(_source):
    return next(db.execute(table_or_collection.select_using_ids([_source])))

visualization_key = indexing_key

show(results, visualization_key, get_original)

ids = db.execute(table_or_collection.insert([datas[2]]))

Relevant log output

No response