Closed royallavanya140 closed 2 months ago
@turboderp can u please help me with this....
Can you elaborate a bit on how you're doing this inference, what model, what chat template you're using, what hardware setup and so on... there's not a lot to go on there.
sure here is the code how i am doing inference,
job = AsyncJob(
self.generator_store[model_name],
filters=_filter,
filter_prefer_eos=True,
input_ids=input_ids,
stop_conditions=prompt_format.stop_conditions(tokenizer),
max_new_tokens=max_new_tokens
)
uid = uuid.uuid4().hex
async for result in job:
token = result.get("text", "")
if result.get("stage") == "streaming" and token:
full_completion += token
completion_tokens += 1
token = self.get_completion_format(
uid, token, model_name, None, [],
self.generator_store[model_name], True
)
yield json.dumps(token)
generator_store is a dict {str: Async_genrator}
model: mistral-v0.3-exl2-8.0 (I converted using convert.py in exllamav2 git repo) chat_template: chatml hardware: cuda-12.1, gcc (Ubuntu 9.4.0-1ubuntu1~20.04.2) 9.4.0
Mistral isn't an instruct model, so it's not going to have very predictable behavior with a ChatML template. You're more or less counting on the base model to guess what you want it to do and it won't always get that right.
And if you use Mistral-instruct instead, the prompt format is Llama(2), not ChatML.
That doesn't explain why you're getting different responses on subsequent prompts. First thing I'd do is check that the input_ids are correct. They should contain the entire tokenized context for each prompt.
I'm not sure what the logic is around the store of generators. Could you elaborate on that? Batching is done with multiple (concurrent) jobs on the same generator, and while you could in principle have multiple generators for one model, each generator needs its own cache.
def __init__(self):
self.generator_store = {}
def create_generator():
if model_name not in self.generator_store:
self.generator_store[model_name] = AsyncGenerator(
model=model, cache=cache, tokenizer=tokenizer
)
def inference(model_name, kwargs):
if model_name not in self.generator_store:
self.create_generator(
model, tokenizer, cache, settings, model_name
)
prompt_format = prompt_formats["chatml"]()
if model_name not in self.generator_store:
self.create_generator(
model, tokenizer, cache, settings, model_name
)
job = AsyncJob(
self.generator_store[model_name],
filters=_filter,
filter_prefer_eos=True,
input_ids=input_ids,
stop_conditions=prompt_format.stop_conditions(tokenizer),
max_new_tokens=max_new_tokens
)
uid = uuid.uuid4().hex
async for result in job:
token = result.get("text", "")
if result.get("stage") == "streaming" and token:
full_completion += token
completion_tokens += 1
token = self.get_completion_format(
uid, token, model_name, None, [],
self.generator_store[model_name], True
)
yield json.dumps(token)
and I tried with mistralv3-instruct as well but still it is giving random token, checked ids as well there are same for the prompts I mentioned in the first comment
what i was trying to do i do parallel inference with a model. problem: if one call is doing inference and if we receive another call as it is sharing same cache some how.
It wont work and u will need to use multi processing
@remichu-ai bro can u elaborate this
Just to confirm if i understand what you are trying do. You are trying to load different models and run current inference at the same time on those models?
@remichu-ai Yes correct, but even with single model it is not working properly 😔
Can u provide enough of the full code in a way that i can reproduce the issue. We can try to get it working for one model first.
For multiple model there will be complication and u wont be able to run it optimally using your method. (Imagine speed reduced by half).
Yeah I think we need more code here. If you're trying to do multiple concurrent inferences on one model this isn't the right approach at least:
def create_generator():
if model_name not in self.generator_store:
self.generator_store[model_name] = AsyncGenerator(
model=model, cache=cache, tokenizer=tokenizer
)
As there is only one global model, cache
pair, calling this with different model_name
arguments would create two generators using the same cache. They would overwrite each other's data and cause all kinds of corruption. It could easily be the kind you're seeing where generator 1 tries to reuse part of the cache (prompt caching) that have been modified by generator 2.
You don't need multiprocessing for concurrent requests to one model. You can add an async job to an async generator at any point, and multiple overlapping jobs will be batched together whether or not they start/finish at the same time. As long as all the jobs are created on the same generator.
Python is not multithreaded, so if you do want multiple models loaded at the same time for concurrent inference, the best way to do that is using multiple processes. It's not strictly needed, but otherwise you'll end up with the models working in turn rather than in parallel. Multiprocessing is pretty simple though, here's an example:
import sys, os
import torch.multiprocessing as mp
def process_worker(process_index, num_processes):
# Imports cannot be shared between processes
from exllamav2 import ExLlamaV2, ExLlamaV2Config, ExLlamaV2Cache, ExLlamaV2Tokenizer, Timer
from exllamav2.generator import ExLlamaV2DynamicGenerator
# Create GPU split, giving each process its own GPU
gs = [0] * num_processes
gs[process_index] = 24
# Load a model
print("Start loading, process:", process_index)
model_dir = "/mnt/str/models/llama3-8b-exl2/4.0bpw"
config = ExLlamaV2Config(model_dir)
model = ExLlamaV2(config)
model.load(gpu_split = gs)
cache = ExLlamaV2Cache(model, max_seq_len = 32768)
tokenizer = ExLlamaV2Tokenizer(config)
print("Finished loading, process:", process_index)
# Create generator
generator = ExLlamaV2DynamicGenerator(
model = model,
cache = cache,
tokenizer = tokenizer,
)
# Do a different batched completion in each process
prompts = ["Once upon a time,"] * (process_index + 1)
outputs = generator.generate(prompt = prompts, max_new_tokens = 150, add_bos = True)
# Print the output
for i, output in enumerate(outputs):
print(f"\n-----------> Process {process_index}, result {i + 1}/{len(outputs)}:\n{output}")
if __name__ == "__main__":
mp.set_start_method('spawn')
processes = []
num_processes = 4
for i in range(0, num_processes):
p = mp.Process(target = process_worker, args = (i, num_processes))
processes.append(p)
p.start()
for p in processes:
p.join()
As a result, we cannot create multiple generators using the same model, cache. Could you please suggest a way to separate the cache for different generators (without increasing cuda memory). I mean single model multiple caches.
You can create multiple generators each with their own cache. But if they're for the same model I'm not sure why you'd do that? It would only limit concurrency, and I'm not seeing a case where it would be beneficial to use multiple generator/cache pairs for the same model instead of just a single generator with a larger cache.
The cache for a dynamic generator doesn't maintain a history of one conversation. Every request starts with the entire past context, and the generator automatically reuses past tokens that are identical to any of the previous requests that are still remembered in the cache. Here's an example for the dynamic generator that runs in a loop and lets you add jobs by pressing "A". Press it multiple times in a row to pile on more jobs before the previous jobs have finished. (Single-character keyboard input is a little hacky in Python):
from exllamav2 import ExLlamaV2, ExLlamaV2Config, ExLlamaV2Cache, ExLlamaV2Tokenizer
from exllamav2.generator import ExLlamaV2DynamicGeneratorAsync, ExLlamaV2DynamicJobAsync
import asyncio, termios, tty, select
col_default = "\u001b[0m"
col_yellow = "\u001b[33;1m"
col_blue = "\u001b[34;1m"
prompts = [
"You should adopt a cat because",
"Cats are the best animal because",
"Cats are better than dogs because",
"Cats make the best pet because",
]
async def run_job(generator, tokenizer, job_index):
global prompts
print(col_yellow + f"Starting job {job_index}" + col_default)
prompt = prompts[job_index % len(prompts)]
job = ExLlamaV2DynamicJobAsync(
generator,
input_ids = tokenizer.encode(prompt),
max_new_tokens = 200,
stop_conditions = [tokenizer.eos_token_id]
)
full_completion = prompt
async for result in job:
full_completion += result.get("text", "")
print(col_blue + f"Finished job {job_index}:" + col_default)
print(full_completion)
print()
def get_single_keypress():
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
return sys.stdin.read(1)
else:
return None
async def task_manager(generator, tokenizer):
tasks = []
job_index = 0
print()
print("Press 'A' to add a job, or 'X' to exit")
print()
while True:
ch = get_single_keypress()
if ch == "a":
task = asyncio.create_task(run_job(generator, tokenizer, job_index))
tasks.append(task)
job_index += 1
elif ch == "x":
print("Exiting")
break
await asyncio.sleep(0.1)
if tasks: await asyncio.gather(*tasks)
async def main():
# Initialize model etc.
model_dir = "/mnt/str/models/mistral-nemo-12b-exl2/4.5bpw/"
config = ExLlamaV2Config(model_dir)
config.max_seq_len = 16384
config.arch_compat_overrides()
model = ExLlamaV2(config)
cache = ExLlamaV2Cache(model, lazy = True)
model.load_autosplit(cache, progress = True)
tokenizer = ExLlamaV2Tokenizer(config)
generator = ExLlamaV2DynamicGeneratorAsync(
model = model,
cache = cache,
tokenizer = tokenizer,
)
# Run interactively
await task_manager(generator, tokenizer)
await generator.close()
if __name__ == "__main__":
# Keyboard input hackery
old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
try:
asyncio.run(main())
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
ya bro thanks it is solved now instead of creating multiple generators, I have created only one for a model_name.
At the begining of the model.
After asking multiple random questions, ask the same question as above but got random response.
i set current_seq_len=0 also still didn't work. and sometimes if i repeat the question(prompt) again and again the model is not generating anything. (gives empty string)
how can we fix this issues.