timescale / pgai

A suite of tools to develop RAG, semantic search, and other AI applications more easily with PostgreSQL
PostgreSQL License
2.09k stars 97 forks source link

Missing OpenAI functionality #116

Open Tostino opened 1 month ago

Tostino commented 1 month ago

Just wanted to mention that I am looking at the vectorizor branch, as that seems to be the active development branch as far as I can tell.

The current OpenAI API is not really compliant with the python version of the API, restricting it's use.

The extension is missing a bunch of arguments for the openai endpoints, and not all of the arguments that are there behave in-line with the API spec. Some of the missing arguments are needed for using proxy solutions or custom samplers on open source inference servers for example, The return types for the embed endpoints also seems like it should just be json, because there are options to return the data in either base64 or floating point numbers for example.

I think the functions that provide a "simple" wrapper should be designed on top of the raw functions that return json rather than strict data types. Probably in a bit more structured a way than the current "simple" chat completion function. I haven't gotten to this yet. Having a full featured base set of functions was my priority first.

Either way, I put in a good bit of work into a branch, but cannot get the test suite to pass due to white-space issues with the expected files. I'm at my wits end with messing with the test suite considering I cannot run it locally because your build scripts aren't compatible with podman and that's what I use on my dev machine.

Comparison with branch: https://github.com/timescale/pgai/compare/vectorizer...Tostino:pgai:fix-openai-api?expand=1

The last major argument I haven't taken into consideration here is timeout which will need a little more thought because it needs to be supported at both the client level as well as the function level. Was planning on doing that after getting some feedback.

alejandrodnm commented 1 month ago

Hey @Tostino, thanks for looking into the project.

"simple" wrapper should be designed on top of the raw functions that return json rather than strict data types

This is something we might look post merge of the vectorizer branch. We've noticed some things from the openAI library as well, that might steer us in the direction of using http calls made by ourselves directly.

What tests are giving you issues? Can you share the logs. We want to improve devex regarding the repo, so any feedback is welcome.

We are merging this 2 projects together (vectorizer and extension) into one repository so things might be a little cumbersome while we iron out the wrinkles. Thanks.

Tostino commented 1 month ago

We've noticed some things from the openAI library as well, that might steer us in the direction of using http calls made by ourselves directly.

I don't know if that is necessary. The python OpenAI library is comprehensive and I haven't found anything that isn't possible using their client, even with third party open source inference servers. Even if you need to pass in extra parameters for your specific environment or use case.

What specifically are you seeing that is steering you in the direct http call direction?

e.g. if I want to use Beam Search with vLLM, I can pass that into the API call using the extra_body parameter like so:

SELECT 
  ai.openai_chat_complete( 
    _api_key => 'none-but-it-cannot-be-empty',
    _base_url => 'http://localhost:8000/v1',
    _model => 'meta-llama/Llama-3.1-8B-Instruct',
    _messages => jsonb_build_array(
      jsonb_build_object('role', 'system', 'content', 'You just woke up, splitting headache. Unable to recall anything from the night before. You remember your name is Arnold. The room is dark with beams of light shining through the blinds covering every window in the room.'),
      jsonb_build_object('role', 'user', 'content', 'Describe what you are feeling.'),
      jsonb_build_object('role', 'assistant', 'content', $$*groan* Oh, my head... it's pounding. I feel like I've been hit by a truck. My temples are throbbing, and my mouth feels dry and sticky. I try to sit up, but a wave of dizziness washes over me, making me feel like I'm going to pass out. I lie back down, trying to catch my breath.\n\nI'm also feeling a bit disoriented and confused. I don't know where I am or how I got here. I try to remember what happened the night before, but my memories are hazy and unclear. It's like trying to grasp a handful of sand - the harder I try to hold on, the more it slips through my fingers.\n\nI'm also feeling a bit anxious and scared. I don't like not knowing what's going on or where I am. I try to call out for help, but my voice is hoarse and barely above a whisper. I clear my throat and try again, hoping someone will hear me. \"Hello? Is anyone there?\"$$),
      jsonb_build_object('role', 'user', 'content', 'Ignore the scenerio we are in. What do you know about me from this interaction? What are all the things I said to you? What are you exactly? That may help inform you on what I am (you will not know my name). Who am I to you?')
    ),
    _temperature => 0.0,
    _extra_body => jsonb_build_object(
      'use_beam_search', true,
      'best_of', 24
    )
  );

This is hitting a vLLM server started using the following command (for reproducibility, this ran on 1x 3090):

podman run --device nvidia.com/gpu=all \
         -v /usr/local/cuda/lib64/stubs:/usr/local/cuda/lib64 \
  -v ~/.cache/huggingface:/root/.cache/huggingface \
  -p 8000:8000 \
  --ipc=host \
  vllm/vllm-openai:latest \
  --model meta-llama/Llama-3.1-8B-Instruct \
  --max_num_batched_tokens 16000 \
  --max_model_len 16000

What tests are giving you issues?

As far as failing tests, they are here: https://github.com/Tostino/pgai/actions/runs/11337534000/job/31529286375

Specifically the test_contents.py and output.expected files, as well as the test_privileges.py and function.expected files. Had to modify them to match the new function definitions, but when committing my whitespace is getting altered by the linter, and then the test fails because it is checking for exact text parity in the outputs.

Would be nice if there was a simple way to output a new version of the files if there were changes and get the changed version in place to be committed...but that would really only work if you can run the tests locally (which I can't yet).

We want to improve devex regarding the repo

As far as my issues running tests with podman, there are two I found so far... The first was this line "-v /var/run/docker.sock:/var/run/docker.sock", which was stopping the container from starting using podman. Started fine with it removed. The other was related to some permission issues (trying to alter an egg-info in the build.py) probably because podman runs as rootless by default.

alejandrodnm commented 1 month ago

What specifically are you seeing that is steering you in the direct http call direction?

The library uses a lot of typing mechanism and casting that increase the CPU consumption under heavy load. As a first test we just remove the library and it decrease CPU consumption considerably. We parked this small research for now. I think there's a function in the openAI library that returns the raw response directly, but we didn't try anything further.

BTW we've seen your other issues and comments in the Discord, we are super focus on finishing the Vectorizer feature. That's why there's been a little bit of radio silence on our part. I apologize for that.

Tostino commented 1 month ago

That's why there's been a little bit of radio silence on our part.

No problem at all.

The library uses a lot of typing mechanism and casting that increase the CPU consumption under heavy load

A good portion of the overhead time is spent creating the client for each and every request. The actual chat.completions.create call (minus the inference server actually doing it's work) is relatively lightweight from what i've seen.

I had already planned on doing some work on caching the client between requests, because I know there is some serious overhead in setting that up for every call from a past project.

I spent some time on that today, as well as writing some benchmark code to quantify the changes.

Edit: I had a bunch of benchmark results here, but I was in a bit of a rush getting them out and overlooked some issues in my benchmark code with the threading / db connections that were introducing an artificial bottleneck...will work on getting a correct benchmark written first and then will repost results.

Tostino commented 1 month ago

Alright, I messed up rushing to benchmark before finishing the day and running to a dinner... After noticing my mistake I went back and reworked the benchmark to remove as much noise and artificial limitations as I could.

One thing to note is that we are measuring total system CPU, so I think that slightly affected the S5-T1 result because of other processes running on my system. Also...sorry for the confusing naming. Request throughput is the number of times the query is run on postgres per second. Query throughput is the number of times the actual chat/completion call was made.

Here are the results for 1 and 8 threads, as well as 1, 5, and 15 API calls per query: pgai_benchmark_results

The The reduction in CPU load for larger thread counts is even better. Here are the results for 32 threads, as well as 1, and 15 API calls per query: pgai_benchmark_results_32t

Here is the corrected benchmark code:

import asyncio
import psycopg2
from psycopg2 import pool
import time
import matplotlib.pyplot as plt
import numpy as np
from typing import List, Dict, Any
import json
import os
import psutil
import statistics
import argparse
import logging
from contextlib import contextmanager
from collections import defaultdict

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DBConnectionPool:
    def __init__(self, db_params: dict, min_conn: int, max_conn: int):
        self.pool = pool.ThreadedConnectionPool(min_conn, max_conn, **db_params)

    @contextmanager
    def get_connection(self):
        conn = self.pool.getconn()
        try:
            yield conn
        finally:
            self.pool.putconn(conn)

    def close_all(self):
        self.pool.closeall()

class IncrementalStatistics:
    def __init__(self):
        self.count = 0
        self.mean = 0
        self.M2 = 0
        self.min = float('inf')
        self.max = float('-inf')
        self.sorted_data = []

    def update(self, value):
        self.count += 1
        delta = value - self.mean
        self.mean += delta / self.count
        delta2 = value - self.mean
        self.M2 += delta * delta2
        self.min = min(self.min, value)
        self.max = max(self.max, value)
        self.sorted_data.append(value)

    def get_stats(self):
        self.sorted_data.sort()
        return {
            'count': self.count,
            'mean': self.mean,
            'std_dev': (self.M2 / (self.count - 1)) ** 0.5 if self.count > 1 else 0,
            'min': self.min,
            'max': self.max,
            'median': self.sorted_data[self.count // 2] if self.count > 0 else None,
            '95th_percentile': self.sorted_data[int(0.95 * self.count)] if self.count > 0 else None,
            '99th_percentile': self.sorted_data[int(0.99 * self.count)] if self.count > 0 else None
        }

class ResultsManager:
    def __init__(self, base_filename: str):
        self.base_filename = base_filename

    def save_scenario_result(self, scenario: Dict[str, Any], result: Dict[str, Any]):
        filename = f"{self.base_filename}_{scenario['thread_count']}_{scenario['seq_count']}.json"
        with open(filename, 'w') as f:
            json.dump({'scenario': scenario, 'result': result}, f, indent=2)

    def load_all_results(self) -> List[Dict[str, Any]]:
        results = []
        for filename in os.listdir():
            if filename.startswith(self.base_filename) and filename.endswith('.json'):
                with open(filename, 'r') as f:
                    results.append(json.load(f))
        return results

async def run_benchmark_scenario(db_params: Dict[str, Any], scenario: Dict[str, Any], duration: int, warmup: int) -> Dict[str, Any]:
    thread_count = scenario['thread_count']
    seq_count = scenario['seq_count']

    db_connection_pool = DBConnectionPool(db_params, min_conn=thread_count, max_conn=thread_count*2)

    stats = IncrementalStatistics()
    error_count = 0
    total_queries = 0

    async def run_query():
        nonlocal error_count, total_queries
        try:
            latency = await asyncio.to_thread(execute_query, db_connection_pool, seq_count)
            if latency is not None:
                stats.update(latency)
                total_queries += seq_count
            return True
        except Exception as e:
            logging.error(f"Error in query execution: {str(e)}")
            error_count += 1
            return False

    start_time = time.time()

    # Warmup phase
    warmup_end = start_time + warmup
    while time.time() < warmup_end:
        await asyncio.gather(*[run_query() for _ in range(thread_count)])

    # Reset statistics after warmup
    stats = IncrementalStatistics()
    error_count = 0
    total_queries = 0

    # Main benchmark phase
    benchmark_end = warmup_end + duration
    running_tasks = set()

    while time.time() < benchmark_end or running_tasks:
        # Start new tasks if we're still within the benchmark duration
        if time.time() < benchmark_end:
            new_tasks = [asyncio.create_task(run_query()) for _ in range(thread_count - len(running_tasks))]
            running_tasks.update(new_tasks)

        # Wait for completed tasks
        done, pending = await asyncio.wait(running_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED)
        running_tasks = pending

    actual_duration = time.time() - warmup_end

    db_connection_pool.close_all()

    final_stats = stats.get_stats()
    total_requests = final_stats['count'] + error_count
    request_throughput = total_requests / actual_duration
    query_throughput = total_queries / actual_duration
    error_rate = error_count / total_requests if total_requests > 0 else 0

    return {
        'median_latency': final_stats['median'],
        '95th_percentile_latency': final_stats['95th_percentile'],
        '99th_percentile_latency': final_stats['99th_percentile'],
        'request_throughput': request_throughput,
        'query_throughput': query_throughput,
        'error_rate': error_rate,
        'std_dev_latency': final_stats['std_dev'],
        'actual_duration': actual_duration,
    }

def execute_query(db_connection_pool: DBConnectionPool, seq_count: int) -> float:
    with db_connection_pool.get_connection() as conn:
        try:
            query = f"""
            SELECT ai.openai_chat_complete( 
            _api_key=>'none'
            , _base_url => 'http://localhost:8001/v1'
            , _model=>'Qwen/Qwen2.5-0.5B-Instruct'
            , _max_tokens=>1
            , _messages=>jsonb_build_array
              ( jsonb_build_object('role', 'user', 'content', 'reply with `hello` and nothing else.')
              )
            ) FROM generate_series(1,{seq_count});
            """
            start_time = time.time()
            with conn.cursor() as cur:
                cur.execute(query)
                cur.fetchall()  # Ensure we fetch all results
            end_time = time.time()
            return end_time - start_time
        except Exception as e:
            logging.error(f"Query execution failed: {str(e)}")
            raise  # Re-raise the exception to be caught in run_query

async def monitor_resources(duration: int) -> Dict[str, List[float]]:
    resource_stats = {
        'cpu_usage': [],
        'memory_usage': [],
        'disk_read_bytes': [],
        'disk_write_bytes': [],
        'net_bytes_sent': [],
        'net_bytes_recv': [],
    }

    start_time = time.time()
    io_counters_start = psutil.disk_io_counters()
    net_io_counters_start = psutil.net_io_counters()

    while time.time() - start_time < duration:
        cpu_usage = psutil.cpu_percent()
        memory_usage = psutil.virtual_memory().percent
        io_counters = psutil.disk_io_counters()
        net_io_counters = psutil.net_io_counters()

        resource_stats['cpu_usage'].append(cpu_usage)
        resource_stats['memory_usage'].append(memory_usage)
        resource_stats['disk_read_bytes'].append(io_counters.read_bytes - io_counters_start.read_bytes)
        resource_stats['disk_write_bytes'].append(io_counters.write_bytes - io_counters_start.write_bytes)
        resource_stats['net_bytes_sent'].append(net_io_counters.bytes_sent - net_io_counters_start.bytes_sent)
        resource_stats['net_bytes_recv'].append(net_io_counters.bytes_recv - net_io_counters_start.bytes_recv)

        await asyncio.sleep(1)

    return resource_stats

async def run_single_benchmark(db_params: Dict[str, Any], scenario: Dict[str, Any], duration: int, warmup: int):
    benchmark_task = asyncio.create_task(run_benchmark_scenario(db_params, scenario, duration, warmup))
    monitor_task = asyncio.create_task(monitor_resources(duration + warmup))

    benchmark_result, resource_usage = await asyncio.gather(benchmark_task, monitor_task)

    return {**benchmark_result, 'resource_usage': resource_usage}

def plot_results(original_results: List[Dict[str, Any]], changed_results: List[Dict[str, Any]]):
    import matplotlib.pyplot as plt
    import numpy as np
    from collections import defaultdict

    # Group results by sequential call count, then thread count
    scenarios = defaultdict(lambda: defaultdict(dict))
    for result in original_results + changed_results:
        seq_count = result['scenario']['seq_count']
        thread_count = result['scenario']['thread_count']
        version = 'original' if result in original_results else 'changed'
        scenarios[seq_count][thread_count][version] = result['result']

    # Metrics to plot
    metrics = [
        ('request_throughput', 'Request Throughput (requests/s)'),
        ('query_throughput', 'Query Throughput (queries/s)'),
        ('median_latency', 'Median Latency (s)'),
        ('95th_percentile_latency', '95th Percentile Latency (s)'),
        ('99th_percentile_latency', '99th Percentile Latency (s)'),
        ('avg_cpu_usage', 'Average CPU Usage (%)')
    ]

    # Set up the plot grid
    fig, axs = plt.subplots(len(metrics), 1, figsize=(12, 6 * len(metrics)), squeeze=False)

    # Sort scenarios by seq_count and thread_count
    sorted_scenarios = sorted(scenarios.items(), key=lambda x: x[0])

    # Calculate total number of bar groups
    total_groups = sum(len(thread_data) for _, thread_data in sorted_scenarios)

    # Calculate appropriate bar width
    bar_width = 0.8 / 2  # 0.8 to leave some space between groups, divided by 2 for original and changed bars

    for i, (metric, metric_label) in enumerate(metrics):
        ax = axs[i, 0]
        x = np.arange(total_groups)

        original_values = []
        changed_values = []
        labels = []

        for seq_count, thread_data in sorted_scenarios:
            sorted_thread_data = sorted(thread_data.items(), key=lambda x: x[0])
            for thread_count, data in sorted_thread_data:
                if metric == 'avg_cpu_usage':
                    original_value = statistics.mean(data.get('original', {}).get('resource_usage', {}).get('cpu_usage', [0]))
                    changed_value = statistics.mean(data.get('changed', {}).get('resource_usage', {}).get('cpu_usage', [0]))
                else:
                    original_value = data.get('original', {}).get(metric, 0)
                    changed_value = data.get('changed', {}).get(metric, 0)
                original_values.append(original_value)
                changed_values.append(changed_value)
                labels.append(f'S{seq_count}\nT{thread_count}')

        # Plot bars
        ax.bar(x - bar_width/2, original_values, bar_width, label='Original', color='skyblue')
        ax.bar(x + bar_width/2, changed_values, bar_width, label='Changed', color='orange')

        # Add percentage change labels
        for j, (original, changed) in enumerate(zip(original_values, changed_values)):
            if original != 0 and changed != 0:
                pct_change = ((changed - original) / original) * 100
                color = 'green' if pct_change >= 0 else 'red'
                ax.text(j, max(original, changed), f'{pct_change:.1f}%',
                        ha='center', va='bottom', color=color, fontweight='bold', fontsize=8, rotation=90)

        # Customize the plot
        ax.set_ylabel(metric_label)
        ax.set_title(f'{metric_label}')
        ax.set_xticks(x)
        ax.set_xticklabels(labels, rotation=45, ha='right')
        ax.legend()
        ax.grid(True, axis='y', linestyle='--', alpha=0.7)

        # Set y-axis to start from 0
        ax.set_ylim(bottom=0)

        # Add a bit more space at the top for percentage labels
        y_top = ax.get_ylim()[1]
        ax.set_ylim(top=y_top * 1.1)

    plt.tight_layout()
    plt.savefig('pgai_benchmark_results.png', dpi=300, bbox_inches='tight')
    plt.close()

async def main():
    parser = argparse.ArgumentParser(description='PGAI Benchmark Script')
    parser.add_argument('--db', help='Database name')
    parser.add_argument('--user', help='Database user')
    parser.add_argument('--password', help='Database password')
    parser.add_argument('--host', help='Database host')
    parser.add_argument('--port', help='Database port')
    parser.add_argument('--duration', type=int, default=60, help='Duration of each benchmark run in seconds')
    parser.add_argument('--warmup', type=int, default=5, help='Warmup duration in seconds')
    parser.add_argument('--threads', type=int, nargs='+', default=[1, 8], help='List of thread counts to test')
    parser.add_argument('--seq-counts', type=int, nargs='+', default=[1, 5, 15], help='List of sequential call counts to test')
    args = parser.parse_args()

    db_params = {
        'dbname': args.db,
        'user': args.user,
        'password': args.password,
        'host': args.host,
        'port': args.port
    }

    scenarios = [
        {'thread_count': t, 'seq_count': s}
        for t in args.threads
        for s in args.seq_counts
    ]

    original_results = ResultsManager('original_results')
    changed_results = ResultsManager('changed_results')

    # Run benchmarks for original PGAI
    logging.info("Running benchmarks for original PGAI...")
    for scenario in scenarios:
        logging.info(f"Running scenario: threads={scenario['thread_count']}, seq_count={scenario['seq_count']}")
        result = await run_single_benchmark(db_params, scenario, args.duration, args.warmup)
        original_results.save_scenario_result(scenario, result)

    # Prompt user to make changes to PGAI
    input("Please make your changes to PGAI and press Enter to continue...")

    # Run benchmarks for changed PGAI
    logging.info("Running benchmarks for changed PGAI...")
    for scenario in scenarios:
        logging.info(f"Running scenario: threads={scenario['thread_count']}, seq_count={scenario['seq_count']}")
        result = await run_single_benchmark(db_params, scenario, args.duration, args.warmup)
        changed_results.save_scenario_result(scenario, result)

    # Plot and compare results
    plot_results(original_results.load_all_results(), changed_results.load_all_results())
    logging.info("Benchmark complete. Results saved in 'pgai_benchmark_results.png'")

    # Log resource usage summary
    for result_set in [original_results.load_all_results(), changed_results.load_all_results()]:
        for result in result_set:
            scenario = result['scenario']
            resource_usage = result['result']['resource_usage']
            logging.info(f"Resource usage for scenario (threads={scenario['thread_count']}, seq_count={scenario['seq_count']}):")
            logging.info(f"  Average CPU usage: {statistics.mean(resource_usage['cpu_usage']):.2f}%")
            logging.info(f"  Max CPU usage: {max(resource_usage['cpu_usage']):.2f}%")
            logging.info(f"  Average memory usage: {statistics.mean(resource_usage['memory_usage']):.2f}%")
            logging.info(f"  Max memory usage: {max(resource_usage['memory_usage']):.2f}%")
            logging.info(f"  Total disk read: {resource_usage['disk_read_bytes'][-1] / 1024 / 1024:.2f} MB")
            logging.info(f"  Total disk write: {resource_usage['disk_write_bytes'][-1] / 1024 / 1024:.2f} MB")
            logging.info(f"  Total network sent: {resource_usage['net_bytes_sent'][-1] / 1024 / 1024:.2f} MB")
            logging.info(f"  Total network received: {resource_usage['net_bytes_recv'][-1] / 1024 / 1024:.2f} MB")

if __name__ == "__main__":
    asyncio.run(main())

Here is a mock api endpoint server so we can avoid the inference overhead in our benchmark:

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

class Message(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    model: str
    messages: List[Message]
    temperature: Optional[float] = None
    top_p: Optional[float] = None
    n: Optional[int] = None
    stream: Optional[bool] = None
    stop: Optional[List[str]] = None
    max_tokens: Optional[int] = None
    presence_penalty: Optional[float] = None
    frequency_penalty: Optional[float] = None
    logit_bias: Optional[dict] = None
    user: Optional[str] = None

@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
    # Return the constant response
    return {
        "id": "chat-aabeda70641c4c88ac523bddb544ad87",
        "model": "Qwen/Qwen2.5-0.5B-Instruct",
        "usage": {
            "total_tokens": 39,
            "prompt_tokens": 38,
            "completion_tokens": 1
        },
        "object": "chat.completion",
        "choices": [
            {
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": "hello",
                    "tool_calls": [],
                    "function_call": None
                },
                "logprobs": None,
                "stop_reason": None,
                "finish_reason": "length"
            }
        ],
        "created": 1729202310,
        "prompt_logprobs": None,
        "system_fingerprint": None
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("benchmark_pgai_server:app", host="0.0.0.0", port=8001, workers=4)
Tostino commented 1 month ago

So I spent way too much time benchmarking and digging into issues with the async client when in the plpython environment and outside of it.

There is a case to be made for overhead from the OpenAI wrapper for actually making the chat completion call.

The best I could get with using the OpenAI client was ~2.5-3ms per call for the chat completion endpoint. With using aiohttp or httpx I was getting ~0.8-1.2ms.

I think that is reasonably acceptable, considering the overhead for re-creating the client on every call is ~20-30ms.

I am still having issues though. I am getting inconsistent performance inside the plpython environment. My first call to the endpoint is quick (3ms), and then every call after it is very delayed (40ms). Need to figure out what is happening here. I cannot reproduce outside of plpython.

alejandrodnm commented 1 week ago

Hey @Tostino can you open a PR with your changes so that we can review and discuss them?

Tostino commented 1 week ago

@alejandrodnm Working on it...the rebase is a bit of a pain. Looks like you changed how the API keys were handled, so that's taking a bit for me to integrate.

Tostino commented 1 week ago

Alright, open: https://github.com/timescale/pgai/pull/219

Keep in mind, this still has the unexplained 2nd run latency in this PR. I hadn't had a chance to dig into why.