web-arena-x / webarena

Code repo for "WebArena: A Realistic Web Environment for Building Autonomous Agents"
https://webarena.dev
Apache License 2.0
633 stars 90 forks source link

Using webarena in async environment? #151

Open zhudotexe opened 1 week ago

zhudotexe commented 1 week ago

Is there a way to use the webarena ScriptBrowserEnv in an async context? For some context on my use case, I'm not using the bundled run.py, but rather building up my own eval loop (following minimal_example.py as a guideline).

The system I'm working on requires running as async, and it seems like WebArena is tightly coupled with the Sync API of Playwright (which in turn does not play nice with another asyncio loop running in parallel). I've done some digging and see that there's an AsyncScriptBrowserEnv, but this does not seem to match the API of ScriptBrowserEnv. Do you have any additional context as to how we might accomplish this?

shuyanzhou commented 1 week ago

Hi, great question! AsyncScriptBrowserEnv was very half-baked. At this point, we do not have plans to support async environments but contributions are welcome! Curious about what scenarios you are working on. Would scripts like this, which launches parallel runs, be helpful to you?

zhudotexe commented 1 week ago

Got it. I think a workaround would be to run the core WebArena loop in a separate process and communicate with the system under test using HTTP -- I'll try this out and update my results here soon.

In particular, I'm the lead developer of the Kani library (GH: https://github.com/zhudotexe/kani; paper: https://aclanthology.org/2023.nlposs-1.8/), which provides an interface for function calling with LLMs. Right now I'm working on a multi-agent system project, which relies heavily on async for running agents in parallel without the need for multiprocessing/threading code (which trips up a lot of Python programmers - I could rant about this for quite some time).

It seems like the parallel run script is primarily a wrapper around tmux, so not quite what I'm looking for, but thanks for the pointer.

zhudotexe commented 1 week ago

Seems like running it as an HTTP server won't work either, as frameworks like Flask process incoming requests in greenlets (which interfere with synchronous Playwright when the Playwright instance needs to be shared across multiple requests).

I've had some success in running all the webarena code in a separate process launched by my main system, which then communicates using a multiprocessing Pipe:

main.py

import multiprocessing

(wa_send, wa_recv) = multiprocessing.Pipe()
wa_process = multiprocessing.Process(target=wa_entrypoint, args=(wa_recv,))
wa_process.start()

def send_command(cmd: str, **data):
    """Send a command and retrieve its response."""
    msg = {"cmd": cmd, "data": data}
    wa_send.send(msg)
    retval = wa_send.recv()
    if isinstance(retval, Exception):
        raise retval
    return retval

for task_config in task_configs:
    # ... code to run system here
    # pseudocode:
    send_command("reset", task_config=task_config)
    while not done:
        prompt = send_command("get_prompt", task=intent)
        action = system.do_work(prompt)
        send_command("action", action=action)
        # ...
    send_command("end", answer)
    score = send_command("score")
    # ...
send_command("stop")
wa_process.join()

subprocess.py

from multiprocessing.connection import Connection
from typing import Any, TypedDict

from browser_env import Action, ScriptBrowserEnv, StateInfo, Trajectory, create_stop_action
from evaluation_harness import evaluator_router

from .utils import map_url_to_real

# ===== wa config ====
WA_HEADLESS = False
WA_TRACE = True

# ===== commands =====
class WASubprocessCommand(TypedDict, total=False):
    cmd: str
    data: Any

# ===== state =====
class State:
    """Store the state of a trial in this class."""

    def __init__(self, env: ScriptBrowserEnv):
        self.env = env
        self.config_file = None
        self.trajectory: Trajectory = []
        self.obs = None
        self.info = None
        self.last_action_success = None

    def save_state_to_trajectory(self):
        """Save the browser state to the trajectory. Called after setup and each action."""
        state_info: StateInfo = {"observation": self.obs, "info": self.info}
        self.trajectory.append(state_info)

    def reset(self, config_file: str):
        """Reset the WA env to the initial state for this trial."""
        self.config_file = config_file
        self.trajectory = []
        self.last_action_success = True
        self.obs, self.info = self.env.reset(options={"config_file": config_file})
        self.save_state_to_trajectory()

    def action(self, action: Action):
        """Save the action to the trajectory, take it, save the resulting state, and return the result."""
        self.trajectory.append(action)
        self.obs, self.last_action_success, _, _, self.info = self.env.step(action)
        self.save_state_to_trajectory()
        # return self.obs, self.last_action_success, self.info

    def get_prompt(self, task: str) -> str:
        """Get the prompt at the current state:
        {observation}
        URL: {url}
        OBJECTIVE: {objective}
        [ERROR: {extra}] (if not last_action_success)
        """
        obs = self.obs["text"]
        page = self.info["page"]
        url = map_url_to_real(page.url)

        raw = f"BROWSER STATE:\n{obs}\nURL: {url}\nOBJECTIVE: {task}"
        if self.last_action_success:
            return raw
        error = self.info["fail_error"]
        return f"{raw}\nERROR: {error}"

    def end(self, answer: str):
        """Called once when the system finishes its task."""
        self.trajectory.append(create_stop_action(answer))

    def get_score(self):
        evaluator = evaluator_router(self.config_file)
        score = evaluator(
            trajectory=self.trajectory,
            config_file=self.config_file,
            page=self.env.page,
            client=self.env.get_page_client(self.env.page),
        )
        return score

    def save_trace(self, path: str):
        if WA_TRACE:
            self.env.save_trace(path)

def wa_entrypoint(pipe: Connection):
    """Main entrypoint for the subprocess.

    Creates the environment and listens for commands from the pipe until a STOP command is received.

    POSTCONDITION: Every received command will send back exactly one response.
    RESTRICTION: No commands will be sent in parallel.
    """
    wa_env = ScriptBrowserEnv(
        headless=WA_HEADLESS,
        observation_type="accessibility_tree",
        current_viewport_only=False,
        viewport_size={
            "width": 1280,
            "height": 720,
        },
        save_trace_enabled=WA_TRACE,
        sleep_after_execution=0.0,
    )
    state = State(wa_env)
    running = True

    while running:
        # recv
        retval = None
        data: WASubprocessCommand = pipe.recv()

        # process
        try:
            match data:
                case {"cmd": "stop"}:
                    running = False
                case {"cmd": "reset", "data": {"config_file": config_file}}:
                    retval = state.reset(config_file)
                case {"cmd": "action", "data": {"action": action}}:
                    retval = state.action(action)
                case {"cmd": "get_prompt", "data": {"task": task}}:
                    retval = state.get_prompt(task)
                case {"cmd": "end", "data": {"answer": answer}}:
                    retval = state.end(answer)
                case {"cmd": "score"}:
                    retval = state.get_score()
                case {"cmd": "save_trace", "data": {"path": path}}:
                    retval = state.save_trace(path)
                case other:
                    print(f"!!! UNKNOWN COMMAND IN WA IPC !!!\n{other}")
                    raise ValueError("Unknown command")
        except Exception as e:
            retval = e

        # return
        pipe.send(retval)

    # clean up
    wa_env.close()