turboderp / exllamav2

A fast inference library for running LLMs locally on modern consumer-class GPUs
MIT License
3.18k stars 233 forks source link

Question on Async generator #519

Closed waterangel91 closed 6 days ago

waterangel91 commented 6 days ago

Hi, i have a question on async generator. I am new to async in python and LLM, and appreciate if someone can provide some guidance.

I have looked through the example here and trying to adapt it to my existing code: https://github.com/turboderp/exllamav2/blob/master/examples/inference_async.py

I currently have a fastapi server that running exllamav2 backend.

If i change from normal generator to async generator, does it mean that if there is new api call hitting my server while the model is generating a request, this new api request will be put into a queue and generate immediately after the generator finish generating the current request?

Or there is more benefit to it than just queuing up the job e.g. higher throughput when there is concurrent request?

Also i learnt that there is the "workers" setting for uvicorn, which seems to make the server able to handle more concurrent requests, does make any different for exllamav2 or for LLM in general?

Thank you.

turboderp commented 6 days ago

If you add a new job while other jobs are processing, the new job will first be enqueued and become "pending". As soon as there's room for it in the cache it will become active and part of the running batch. So if there's enough room, an incoming job will start right away. Here's a complete example:


import sys, os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from exllamav2 import ExLlamaV2, ExLlamaV2Config, ExLlamaV2Cache, ExLlamaV2Tokenizer
from exllamav2.generator import ExLlamaV2DynamicGeneratorAsync, ExLlamaV2DynamicJobAsync
import asyncio, random

async def main():
    model_dir = "/mnt/str/models/llama3-8b-exl2/4.0bpw"
    config = ExLlamaV2Config(model_dir)
    model = ExLlamaV2(config)
    cache = ExLlamaV2Cache(model, lazy = True)
    model.load_autosplit(cache, progress = True)

    print("Loading tokenizer...")
    tokenizer = ExLlamaV2Tokenizer(config)

    # Initialize the async generator with all default parameters

    generator = ExLlamaV2DynamicGeneratorAsync(
        model = model,
        cache = cache,
        tokenizer = tokenizer,
    )

    # Run some async job tasks

    prompts = [
        "Once upon a time, there was",
        "asyncio in Python is a great feature because",
        "asyncio in Python is a pain to work with because",
    ]

    async def run_job(prompt: str, max_new_tokens: int, identifier: int):

        print(f" -- Starting job #{identifier}: '{prompt}' + {max_new_tokens} tokens")

        # Create job
        job = ExLlamaV2DynamicJobAsync(
            generator,
            input_ids = tokenizer.encode(prompt, add_bos = False),
            max_new_tokens = max_new_tokens
        )

        # String to receive streamed result
        full_completion = prompt

        # Iterate through the job.
        async for result in job:
            full_completion += result.get("text", "")

        print(f" -- Finished job #{identifier}: {repr(full_completion)}")
        return full_completion

    # Initially create three tasks
    tasks = []
    idx = 0
    for _ in range(3):
        print(f" -- Appending task #{idx}")
        task = asyncio.create_task(run_job(prompts[idx % len(prompts)], random.randint(20, 150), idx))
        tasks.append(task)
        idx += 1

    # Wait for the first task to finish
    await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED)
    print(" -- First task completed")

    # Add another two tasks
    for _ in range(2):
        print(f" -- Appending task #{idx}")
        task = asyncio.create_task(run_job(prompts[idx % len(prompts)], random.randint(20, 150), idx))
        tasks.append(task)
        idx += 1

    # Wait for the rest of the tasks to finish
    await asyncio.wait(tasks, return_when = asyncio.ALL_COMPLETED)
    print(" -- All tasks completed")

    # Cleanup
    await generator.close()

if __name__ == "__main__":
    asyncio.run(main())
Loading: /mnt/str/models/llama3-8b-exl2/4.0bpw ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:01 0:00:00
Loading tokenizer...
 -- Appending task #0
 -- Appending task #1
 -- Appending task #2
 -- Starting job #0: 'Once upon a time, there was' + 45 tokens
 -- Starting job #1: 'asyncio in Python is a great feature because' + 136 tokens
 -- Starting job #2: 'asyncio in Python is a pain to work with because' + 74 tokens
 -- Finished job #0: 'Once upon a time, there was a king who had a son. The son had a great desire to go hunting, but the king forbade him to do so, because he was afraid that he might be hurt. The son, however, would not listen'
 -- First task completed
 -- Appending task #3
 -- Appending task #4
 -- Starting job #3: 'Once upon a time, there was' + 53 tokens
 -- Starting job #4: 'asyncio in Python is a great feature because' + 37 tokens
 -- Finished job #2: "asyncio in Python is a pain to work with because the documentation is poor, the examples are few and far between, and the examples that do exist are often very confusing. That said, it's still the most promising way to do concurrent I/O in Python at the moment, so I'm going to try to figure out how to use it. (Or, rather, I'm going to try to figure out how to"
 -- Finished job #4: 'asyncio in Python is a great feature because it means that you can have the best of both worlds: asynchronous execution and synchronous debugging. In the following example, we use a simple loop to iterate over the elements of a list and'
 -- Finished job #3: 'Once upon a time, there was a small town. It sat on the edge of a lake. There were many trees and flowers and grass and people. The people loved the town and the town loved the people. And it was a very happy town.\nOne day, the people were playing and laughing'
 -- Finished job #1: 'asyncio in Python is a great feature because it allows you to make asynchronous calls to functions. But, the downside\nto this is that the execution of your asynchronous code will be split into multiple threads, which may cause some performance issues.\n\nIf you use asyncio and you want to get the best performance, you should use the multiprocessing module in Python. This module\nallows you to run multiple processes in parallel, which can speed up your code significantly.\n\nIn this article, I will show you how to use the multiprocessing module in Python to get the best performance when using\nasyncio.\n\n## What is asyncio?\n\nasyncio is a module in Python that allows you to make asynchronous calls to functions. This means that your code will'
 -- All tasks completed

This starts three jobs of random lengths in separate async tasks. These will be batched and run in parallel. When the first one finishes, another two tasks are added, after which there are four active tasks all running in parallel. Then they finish one by one.

TabbyAPI fully implements a FastAPI inference server using the asyncio wrapper.

The async generator is just a wrapper, though, and it's not necessary for continuous, dynamic batching. You can also call generator.enqueue() and generator.iterate() directly if you want more direct control of how jobs are added and how the output is routed.

waterangel91 commented 6 days ago

Thank you very much for the clarification, as a follow up question, as you mentioned here "So if there's enough room, an incoming job will start right away. ".

Does that mean the streaming out of the generator will have mix of generation for different prompts? Will I need to use identifier argument of the job to keep track which generated token is for which prompt/ api request? Or the generator will always generate continuous output for each prompt one by one.

Now that i see the code again, the job here is local defined.

        # Create job
        job = ExLlamaV2DynamicJobAsync(
            generator,
            input_ids = tokenizer.encode(prompt, add_bos = False),
            max_new_tokens = max_new_tokens
        )

        # String to receive streamed result
        full_completion = prompt

        # Iterate through the job.
        async for result in job:
            full_completion += result.get("text", "")

So the for loop only generate the result for the current job right?

        # Iterate through the job.
        async for result in job:
            full_completion += result.get("text", "")
turboderp commented 6 days ago

With the async wrapper, each job works like an independent asyncio-compatible generator, but there's still a batching generator under the good. So the async for loop in each task is actually a view on the same batched operation that generates tokens for all the jobs, with routing taken care of by the wrapper.

It's equivalent to:

  1. Create three ExLlamaV2DynamicJobs with an output collector for each
  2. enqueue all three jobs
  3. Call iterate() to get a batch of results
  4. For each result in the batch, route the output to the appropriate collector
  5. As long as there are still three active jobs, goto 3
  6. Create two more ExLlamaV2DynamicJobs
  7. enqueue the two new jobs
  8. Call iterate() to get a batch of results
  9. For each result in the batch, route the output to the appropriate collector
  10. As long as there are any active jobs, goto 8
waterangel91 commented 6 days ago

Thank you very much for the detail explanation. I have managed to implement it in my server with your advice and it is really amazing, the generation stream for the 2nd prompt indeed start while the first one still on going like you mentioned.

It is really awesome!!!!

Can wait to load 2 different odels in Vram and try to hit it at the same time :)

rjmehta1993 commented 4 days ago

@waterangel91 Are you using async with streaming? If yes, can you help in defining how the server looks like?

waterangel91 commented 4 days ago

Yes i use async with streaming but not batching. You can read my comment in this thread above in more detail. The way the script work there wont be mix up of generated response. But you will need to change to the async version of the code.