iterative / datachain

DataChain 🔗 AI-dataframe to enrich, transform and analyze data from cloud storages for ML training and LLM apps
https://datachain.dvc.ai
Apache License 2.0
688 stars 35 forks source link

No "batched" Inference #170

Open michaelfeil opened 1 month ago

michaelfeil commented 1 month ago

I noticed a number of various things are incorrectly implemented.

classifier = pipeline("sentiment-analysis", device="cpu",
                model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")

def is_positive_dialogue_ending(file) -> bool:
    dialogue_ending = file.read()[-512:] # 512 chars != 512 tokens
    return classifier(dialogue_ending)[0]["label"] == "POSITIVE" # NOT performing batched inference, performs threaded, blocking single inference -> really bad once you are on GPU.

To perform batched inference, you would need to add multiple batches, with a batch size > 1 to the sentence classification pipeline.

classifier = pipeline("sentiment-analysis", device="cpu",  
                # tokenizer from left 
                truncation="right/left", # sets truncation from the right side, I think this is not possible
                model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")

def is_positive_dialogue_ending(files: list) -> list[bool]:
    dialogue_endings = [file.read()for file in files] # something ike this
    return (label  == "POSITIVE" for label in  classifier(dialogue_endings ))  # something like this

As a result, I launched https://github.com/michaelfeil/embed and https://github.com/michaelfeil/infinity for sentence classification. The backend queues and batches the requests, allowing better instructions to be used. This is useful for CPU, but crucial for e.g. CPU usage!

from embed import BatchedInference
from concurrent.futures import Future

def is_positive_dialogue_ending(file) -> bool:
    dialogue_ending = file.read()[-512:] # 512 chars != 512 tokens
    return classifier(dialogue_ending)[0]["label"] == "POSITIVE" # NOT performing batched inference, performs threaded, blocking single inference -> really bad once you are on GPU.

# Run any model
register = BatchedInference(
    model_id=[
        # classification models
        "distilbert/distilbert-base-uncased-finetuned-sst-2-english")
    ],
    # engine to `torch` or `optimum`
    engine="torch",
    # device `cuda` (Nvidia/AMD) or `cpu`
    device="cpu",
)

def is_positive_dialogue_ending(file) -> bool:
    """not multiprocessing recommended, but neither is transformers. Threading is encouraged."
    dialogue_ending = file.read()[-512:] # 512 chars != 512 tokens
    future: "Future" = register.classify(model_id="philschmid/tiny-bert-sst2-distilled", sentences=[dialogue_ending])
    # best: defer to later stage
    return future.result()[0]["label"] == "POSITIVE"
volkfox commented 1 month ago

need to fix batching ASAP for many similar HuggingFace models to work

dmpetrov commented 1 month ago

This is related to batch_map #84 I prioritized this one.

shcheklein commented 1 month ago

@dberenbaum can this be closed now?

dberenbaum commented 1 month ago

See the note from #191:

I think we should keep open https://github.com/iterative/datachain/issues/170. That request seems to be specifically about using futures to batch individual results using the existing .map without needing a separate .batch_map(). I think .batch_map() may be both simpler to implement and explain for now, but I think we could come back to the ideas in https://github.com/iterative/datachain/issues/170 in the future.