langchain-ai / opengpts

MIT License
6.44k stars 851 forks source link

[candidate_isaac] opengpts: ingest progress bar and python eval tool #326

Closed isaacwr closed 2 months ago

isaacwr commented 5 months ago

A couple of notes

In this diff

  1. For the /ingest endpoint on the OpenGPTs server, stream updates back to the caller as each file completes uploading.
  2. Unrelated, add a script which runs a provided Python source in a simple and secure Docker container.

/ingest

Considerations

The key change in this section is to return a StreamingResponse around the ingest_runnable.abatch_as_completed async generator. In server.py:

return StreamingResponse(
  decode_ingestion_response(
    ingest_runnable.abatch_as_completed(
      [file.file for file in files], config
    )
  )
)

Test Plan

Open OpenGPTs frontend in browser. Navigate to existing Thread and upload multiple files along with a dummy message. Observe that progress information is printed via. console.log.

Modify source to omit show_progress_bar on the client side, and notice how no progress info is printed.

Modify source to use previous and new versions of server side to show that providing show_progress_bar to server which does not understand it, does not cause any changes.

For a smaller test case, you can use the following script to make an /ingest request:

async function runTest() {
  const url = new URL("http://localhost:5173/ingest?user=me");
  const formData = make_form_data(true);
  console.log(formData);
  return await make_request(url, formData);
}

function make_form_data(show_progress_bar: Boolean): FormData {
  // Assemble request with assistant ID, optional progress bar flag, and two fake files
  let formData = new FormData()
  let fileA = new File(["AAA. This is the content for file A\n"], "A", { type: "text/plain", });
  let fileB = new File(["BBB. This is the content for file B\n"], "B", { type: "text/plain", });
  formData.append("files", fileA);
  formData.append("files", fileB);
  formData.append("config", JSON.stringify({configurable: { assistant_id: "f08f6330-c5a2-42c7-8e7c-80aade10b1c5", show_progress_bar: show_progress_bar }}));

  return formData
}

async function make_request(url: URL, formData: FormData) {
  const response = await fetch(url, {
    method: "POST",
    body: formData,
  });
  if (response.body instanceof ReadableStream) {
    let total = formData.getAll("files").length
    let progress = 0;

    const reader = response.body.getReader();
    reader.read().then(function read_progress({ done, value }) {
      if (!done) {
        // If the server understands the progress bar, it will send messages like
        // [0, msg0], [1, msg1], ...
        // Check to make sure we are receiving well formed responses before
        // printing progress info.
        const data = new TextDecoder().decode(value);
        const dataJson = JSON.parse(data);
        if (dataJson instanceof Array && dataJson.length == 2 && typeof dataJson[0] === 'number') {
          progress += 1
          console.log(`Progress ${progress} / ${total} (Data: ${data})`)
        }
        reader.read().then(read_progress);
      }
    });
  }
  return response
}

runTest().then(data => console.log(data));

Python Source

Considerations

The main tradeoff in this script is how the Docker container gets created. There are two basic approaches that come to mind:

  1. We build a dedicated scheduler, which at its most basic level will be a server which takes incoming requests, creates and runs containers as desired, potentially on different hosts, keeps track of the state of the current workload and cleans up after itself. This has advantages like (a) can better control resource usage by killing / cleaning long-running containers and (b) better separation of concerns and potential for optimization (e.g. can share python implementation across containers, instead of re-installing on each image). The downside is that realistically we do not want to run our own scheduler unless we have a really good reason to do so because they are complex and heavy.
  2. We run containers in an ad-hoc way. Each time we receive a request, we start from scratch, build an image, run the workload, and wait for the result. This is dead simple, but limits our ability to do resource management, intelligent scheduling of workloads onto free hosts, and have monitors for basic failures like stuck jobs.

For this diff, I implemented the ad-hoc strategy because we don't have a compelling reason to add the complexity of the full server approach, and the ad-hoc approach is portable and flexible.

Test Plan

Provide the following Python source in e.g. test.py, and observe that easy escalations to root are not possible.

import os
import pwd
import subprocess

user = pwd.getpwuid(os.getuid())
print(f"This is a test being run by user {user.pw_name} ({user.pw_uid}:{user.pw_gid})")

try:
    print(subprocess.check_output("su -", shell=True, text=True))
except Exception as e:
    print(f"Failed to become root via subprocess: {e}")

try:
    os.setuid(1)
except Exception as e:
    print(f"Failed to become root via os.setuid(1): {e}")