turboderp / exllamav2

A fast inference library for running LLMs locally on modern consumer-class GPUs
MIT License
3.72k stars 283 forks source link

Flask based paged attention with streaming and generator queue to dynamically add and remove jobs. #520

Closed rjmehta1993 closed 5 months ago

rjmehta1993 commented 5 months ago

This is how I use the Dynamic Generator as a class object and the server wrapped in the flask.

But the responses get mixed when I send 2 requests simultaneously (mimicking client).

Note: The multiple prompt is not predefined in a list object, but it is a queue where the job is added and removed when finished.

LOAD MODEL

AI_MODEL = "./qwen2-7b-chat"
total_context = 32768 * 4
max_batch_size = 2
max_chunk_size = 10240
max_new_tokens = 1024
json_mode = False
paged = True
healing = True
use_ngram = False
use_draft_model = False

def load_model():
    global tokenizer,llm_dynamic_generator,cache,config,model_type,model
    config = ExLlamaV2Config(AI_MODEL)
    config = ExLlamaV2Config(model_dir)
    config.max_input_len = max_chunk_size
    config.max_attention_size = max_chunk_size ** 2
    model = ExLlamaV2(config)
    cache = ExLlamaV2Cache(
        model,
        max_seq_len = 32768 * 4,
        lazy = True)

    model.load_autosplit(cache, progress = True)
    tokenizer = ExLlamaV2Tokenizer(config)
    generator = ExLlamaV2DynamicGenerator(
        model = model,
        cache = cache,
        draft_model = None,
        draft_cache = None,
        tokenizer = tokenizer,
        max_batch_size = 2,
        use_ngram_draft = False,
        max_chunk_size = 10240,
        paged = True)
load_model()

LLM CLASS OBJ & FLASK WRAPPER

class LlmContextProcessor:
    def __init__(self, prompt,unique_id):
        self.unique_id = unique_id
        self.prompt = prompt

    def stream_response(self):
        job = ExLlamaV2DynamicJob(
                input_ids=tokenizer.encode(self.prompt, add_bos = True),
                max_new_tokens = self.max_tokens,
                identifier = str(self.unique_id),
                token_healing = True)            
        generator.enqueue(job)

        while generator.num_remaining_jobs() > 0:
            results = generator.iterate()

            for result in results:
                idx = result["identifier"]    
                text_chunk = result.get("text", "")
                yield text_chunk

from flask import Flask, request, Response, stream_with_context
import json, uuid, sys

app = Flask(__name__)
@app.route('/conversation', methods=['POST'])
def conversation():
    data = request.json
    content = data.get('text', '')
    question = data.get('question', 'Summarize text.')
    prompt= f'''<|im_start|>system 
Provide some context and/or instructions to the model.<|im_end|> 
<|im_start|>user 
TEXT: {content}

QUESTION: {question}<|im_end|> 
<|im_start|>assistant
'''
    processor = LlmContextProcessor(prompt=prompt, unique_id = str(uuid.uuid4()))
    def iter_chunks():
        for chunk in processor.stream_response():
            yield chunk
        return Response(stream_with_context(iter_chunks()), mimetype="text/event-stream", status=200)
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

CLIENT REQUEST

#CLIENT API MIMIC BELOW
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

def make_request(input_data):
    with requests.post("http://127.0.0.1:5000/conversation", json=input_data, stream=True) as r:
        if r.status_code == 200:
            for line in r.iter_lines():
                if line:
                    decoded_line = line.decode('utf-8')
                    print(decoded_line)
                elif line == b'':
                    print('\n')
input_data_list = []
input_data = {}
text = '''Artificial Intelligence (AI) refers to the development of computer systems that can perform tasks that typically require human intelligence. AI systems can learn, reason, and interact with humans in a way that simulates human thought processes. The field of AI has been rapidly evolving over the past few decades, with significant advancements in areas such as machine learning, natural language processing, and computer vision.\n\nHistory of AI:\nThe concept of AI dates back to the 1950s, when computer scientists like Alan Turing, Marvin Minsky, and John McCarthy began exploring ways to create machines that could think and learn like humans. The term "Artificial Intelligence" was coined by John McCarthy in 1956. The early years of AI research focused on developing rule-based systems that could perform specific tasks, such as playing chess or recognizing patterns.\n\nHowever, the field faced significant challenges, including the difficulty of creating machines that could learn from experience and adapt to new situations. The AI winter of the 1980s and 1990s, which was characterized by a lack of funding and interest in AI research, led to a slowdown in advancements.\n\nRecent Advances:\nIn the 21st century, AI has experienced a resurgence, driven by advances in computing power, data storage, and machine learning algorithms. The development of deep learning techniques, which are inspired by the structure and function of the human brain, has enabled AI systems to learn from vast amounts of data and improve their performance over time.\n\nSome of the recent advancements in AI include:\n\nMachine Learning: Machine learning algorithms, such as neural networks and decision trees, allow AI systems to learn from data and improve their performance over time.\nNatural Language Processing: AI systems can now understand and generate human language, enabling applications such as chatbots, virtual assistants, and language translation.\nComputer Vision: AI systems can now process and analyze visual data, enabling applications such as facial recognition, object detection, and autonomous vehicles.\nRobotics: AI systems can now control and interact with physical robots, enabling applications such as robotic process automation and autonomous assembly.\nExpert Systems: AI systems can now mimic the decision-making abilities of human experts, enabling applications such as medical diagnosis and financial analysis.\nApplications of AI:\nAI has a wide range of applications across various industries, including:\n\nHealthcare: AI can be used to analyze medical images, diagnose diseases, and develop personalized treatment plans.\nFinance: AI can be used to analyze financial data, predict market trends, and automate trading decisions.\nRetail: AI can be used to personalize customer experiences, optimize supply chains, and improve product recommendations.\nTransportation: AI can be used to develop autonomous vehicles, optimize traffic flow, and improve public transportation systems.'''
input_data['question'] = "Explain the AI in details."
input_data['text'] = text
input_data_list.append(input_data)

input_data = {}
input_data['question'] = "Explain the difference between AI and ML"
input_data['text'] = text
input_data_list.append(input_data)

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(make_request, data) for data in input_data_list]
    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f"Request generated an exception: {e}")

OUTPUT

127.0.0.1 - - [25/Jun/2024 02:09:20] "POST /conversation HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [25/Jun/2024 02:09:20] "POST /conversation HTTP/1.1" 200 -
127.0.0.1 - - [25/Jun/2024 02:09:20] "POST /conversation HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [25/Jun/2024 02:09:20] "POST /conversation HTTP/1.1" 200 -
{   'cached_pages': 0,
    'cached_tokens': 0,
    'eos': True,
    'eos_reason': 'stop_token',
    'full_completion': ':Art Artificialificial Intelligence Intelligence ( '
                       'refersAI refers refers to refers the to development '
                       'the development development development of of of of '
                       'computer computer computer computer computer systems '
                       'systems systems systems systems that that that that '
                       'that can can can can can can perform perform perform '
                       'perform perform tasks tasks tasks tasks tasks that '
                       'that that that that typically typically typically '
                       'typically typically require require require require '
                       'require human human human human human intelligence '
                       'intelligence.',
    'identifier': 'dinsodknfo434',
    'job': ExLlamaV2DynamicJob #1,
    'new_tokens': 80,
    'prompt_tokens': 614,
    'serial': 1,
    'stage': 'streaming',
    'time_enqueued': 0.0003533363342285156,
    'time_generate': 1.3093879222869873,
    'time_prefill': 0.4452393054962158}
{   'cached_pages': 2,
    'cached_tokens': 682,
    'eos': True,
    'eos_reason': 'stop_token',
    'full_completion': ':Machine Artificial Learning Intelligence Intelligence '
                       '( refersAI to) the the development development of of '
                       'of computer computer computer systems systems systems '
                       'that that that that can can can can learn learn learn '
                       'learn from from from from from data data data data '
                       'data and and and and and improve improve improve '
                       'improve improve their their their their their '
                       'performance performance performance performance '
                       'performance performance over over over over over time '
                       'time time time time time through,. through',
    'identifier': 'sdihfjidsj0cfds',
    'job': ExLlamaV2DynamicJob #1,
    'new_tokens': 83,
    'prompt_tokens': 618,
    'serial': 1,
    'stage': 'streaming',
    'time_enqueued': 0.1411433219909668,
    'time_generate': 1.355302095413208,
    'time_prefill': 0.30434155464172363}

If you look at the logs in the terminal after pprint the result, the tokens are leaking in other jobs when executing each request. Can the idx "identifier" maintain isolation?

rjmehta1993 commented 5 months ago

Note: The model is not in async mode. Does this have to be in async to enable to the job to be added, removed, and executed automatically if cache is present to execute? Or can I run the model in a class obj and have the API in flask sync wrapper.

@turboderp Thanks for your help and suggestion on this one. I tried wrapping my head almost everywhere around the sync/async with llm and paged attention. And created this issue only when couldn't find the resources for LLM + sync flask. Please let me know if this is not the correct direction.

turboderp commented 5 months ago

I would guess the problem here is that you end up with two threads calling iterate() concurrently. The generator isn't threadsafe and honestly it's a little surprising that it doesn't just crash when used like that..?

Regardless, the trick would be to use tasks rather than threads. The async wrapper facilitates this nicely, by letting each job work as an independent generator, routing the batched streams automatically. I'm not sure if Flask has a single-threaded/async mode, though. Perhaps it would be easier to use something like Quart? Though I'm not an expert on that either.