chdb-io / chdb

chDB is an in-process OLAP SQL Engine 🚀 powered by ClickHouse
https://clickhouse.com/chdb
Apache License 2.0
2.09k stars 74 forks source link

How to define a Python UDF with (return_type="Array(Float32)") #236

Closed onefanwu closed 4 months ago

onefanwu commented 4 months ago
from chdb.session import Session
from chdb.udf import chdb_udf
import pandas as pd
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer
import numpy as np
import tritonclient.grpc as grpcclient
import time
import pyarrow as pa

@chdb_udf(return_type="Array(Float32)")
def text_embedding(texts):
    with open('/workdir/AiQ-dev/AiQ-benchmark/log/20240627/vector_size.log', 'a') as log_file:
        log_file.write(f"chdb vector size: {len(texts)}\n")
    embeddings_list = []
    batch_size = 512
    # Create gRPC client
    triton_client = grpcclient.InferenceServerClient(url="localhost:8001")

    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i + batch_size]

        # Tokenize sentences
        encoded_input = tokenizer(batch_texts, padding=True, truncation=True, return_tensors='pt')

        input_ids = encoded_input['input_ids']
        attention_mask = encoded_input['attention_mask']

        # Convert to numpy arrays
        input0_data = input_ids.numpy().astype(np.int64)
        input1_data = attention_mask.numpy().astype(np.int64)

        # Initialize inputs and outputs
        inputs = [
            grpcclient.InferInput("INPUT__0", input0_data.shape, "INT64"),
            grpcclient.InferInput("INPUT__1", input1_data.shape, "INT64")
        ]
        outputs = [grpcclient.InferRequestedOutput("OUTPUT__0")]

        # Set data for inputs
        inputs[0].set_data_from_numpy(input0_data)
        inputs[1].set_data_from_numpy(input1_data)

        # Perform inference
        results = triton_client.infer(model_name="all-MiniLM-L6-v2", inputs=inputs, outputs=outputs)

        # Get the output data
        output0_data = results.as_numpy("OUTPUT__0")

        # Convert the output to a torch tensor
        model_output = torch.tensor(output0_data)

        # Perform pooling
        embeddings = mean_pooling(model_output, attention_mask)

        # Normalize embeddings
        embeddings = F.normalize(embeddings, p=2, dim=1)

        # Convert embeddings to list and append to the result list
        embeddings_list.extend(embeddings.tolist())

    return pa.array(embeddings_list, type=pa.list_(pa.float32()))

I wrote this udf and I want to transform a batch of texts all into embedding vectors.

But I'm getting errors reported, and it seems that there is a problem with the handling on the return type.

root@node04:/workdir/AiQ-dev/AiQ-benchmark# python3 /workdir/AiQ-dev/AiQ-benchmark/baseline/chdb/semantic_search.py
Traceback (most recent call last):
  File "/workdir/AiQ-dev/AiQ-benchmark/baseline/chdb/semantic_search.py", line 145, in <module>
    result = db.query(query)
  File "/usr/local/lib/python3.10/dist-packages/chdb/session/state.py", line 47, in query
    return query(sql, fmt, path=self._path)
  File "/usr/local/lib/python3.10/dist-packages/chdb/__init__.py", line 78, in query
    raise ChdbError(res.error_message())
chdb.ChdbError: Code: 302. DB::Exception: Child process was exited with return code 1: While processing text_embedding('OLAP Database') AS embedding. (CHILD_WAS_NOT_EXITED_NORMALLY)
onefanwu commented 4 months ago

My sql:

SELECT text_embedding('OLAP Database') AS embedding
auxten commented 4 months ago

You should import pkg that UDF used in the function.

onefanwu commented 4 months ago

@auxten Can you be more specific about packages I may have forgotten to import? I'm using similar code that works fine in duckdb and clickhouse, but it's not clear how UDFs that return Array(Float32) are defined in chdb.

onefanwu commented 4 months ago

I am uncertain about how to handle the input and output for the chdb UDF. @auxten Could you kindly provide an example where the UDF accepts a string as input and yields an Array of Float32 as output?

nmreadelf commented 4 months ago

i think you just return embeddings_list in your code.

here is a example

from chdb.udf import chdb_udf
from chdb.session import Session

@chdb_udf(return_type="Array(int)")
def atois(arg):
    return [int(arg)]

with Session() as session:
    ret = session.query("select atois('12')")
    print(ret)
onefanwu commented 4 months ago

@nmreadelf Thank you. It worked for me.