Open abrichr opened 3 months ago
There are two pieces:
whenever a replay happens, record as much as possible about the replay action automatically (e.g. date/time, strategy, strategy arguments)
As a developer, I would like to add this to replay code:
self.replay_log.capture({"segmentation": segmentation})
And have this appear in the dashboard along with e.g. line number, date/time, commit hash.
We need to be able to store plain text (e.g. JSON) and binary (e.g. pickle).
Example binary data: https://github.com/OpenAdaptAI/OpenAdapt/blob/main/openadapt/strategies/visual.py#L66
e.g.
class ReplayLog:
key: str
data: ...
Related: https://github.com/OpenAdaptAI/OpenAdapt/tree/main/openadapt/capture
We probably want to capture automatically during replay.
Eventually we may also want to run record
, but that's not necessary for the first order attempt.
To automatically instrument the execution of replay.py
and ensure that all relevant inferences are stored to the database efficiently and systematically, you can utilize Python's decorators and context managers to capture relevant data points during the execution of specific functions or methods. This approach would allow you to encapsulate the logging logic separately from the business logic, making your code cleaner and less error-prone.
A decorator can be applied to functions where you need to capture the execution details. This approach is useful for functions or methods where important data transformations or decisions are made, such as segmentation results or action decisions.
Here's a simple decorator that could be used to log the input and output of a method, which can then be pushed to your database:
from functools import wraps
from openadapt.db import crud # Assuming CRUD operations are defined here for database interactions
def log_inference(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Log input details
input_details = {
"function": func.__name__,
"input_args": args,
"input_kwargs": kwargs
}
crud.create_log_entry(input_details) # Assuming this function logs to the database
result = func(*args, **kwargs)
# Log output details
output_details = {
"function": func.__name__,
"output": result
}
crud.create_log_entry(output_details) # Assuming this function logs to the database
return result
return wrapper
# Usage
@log_inference
def get_window_segmentation(action_event):
# Implementation details...
pass
A context manager can be used for capturing a block of operations, especially where multiple operations need to be logged together, or you need to ensure resources are correctly managed, such as database sessions.
Here's an example of a context manager that could be used to encapsulate a database session or transaction for logging:
from contextlib import contextmanager
from sqlalchemy.orm import Session
from openadapt.db import engine # Assuming an SQLAlchemy engine is configured
@contextmanager
def logging_context(description: str):
session = Session(bind=engine)
try:
yield
session.commit() # Commit at the end of the block
except Exception as e:
session.rollback() # Ensure rollback if there's an error
raise
finally:
session.close()
# Usage in your replay.py
with logging_context("Replaying strategy execution"):
strategy.run()
# Additional operations can be performed here, and all will be wrapped in a single transaction
For displaying these logs in a web dashboard:
This approach ensures your application's core functionality is decoupled from the logging logic, provides a systematic way to capture all necessary data for debugging or analysis, and prepares your infrastructure for future scalability.
To handle the replay ID automatically and ensure it's used seamlessly throughout the execution of python -m openadapt.replay
without the developer needing to set it manually, you can use a context manager or a global state management approach. The goal is to manage the lifecycle of a replay session, creating a new entry in the database at the start and capturing the replay_id
for use during the session. Here’s how you can achieve this:
Create a context manager that initializes a replay session and stores the replay_id
. This replay_id
is then accessible during the execution of the replay.
from contextlib import contextmanager
from openadapt.db import crud, Session # Assuming CRUD operations and session handling
@contextmanager
def replay_session(strategy_name, **kwargs):
session = Session() # Create a new database session
try:
# Create and commit a new replay record
new_replay = Replay(
timestamp=datetime.utcnow().timestamp(),
strategy_name=strategy_name,
strategy_args=json.dumps(kwargs)
)
session.add(new_replay)
session.commit()
yield new_replay.id # This is the replay_id
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# Usage in replay module
def execute_replay(strategy_name, **kwargs):
with replay_session(strategy_name, **kwargs) as replay_id:
# Here replay_id is automatically managed and can be passed to other functions
run_strategy(strategy_name, replay_id=replay_id, **kwargs)
def run_strategy(strategy_name, replay_id, **kwargs):
strategy = get_strategy(strategy_name, replay_id=replay_id, **kwargs)
strategy.run()
Another approach is to manage the replay_id
using a global state that is accessible throughout the execution context. This can be done using a global configuration or state object.
# global_state.py
class GlobalState:
replay_id = None
# replay.py
from openadapt.global_state import GlobalState
from openadapt.db import crud
def start_replay(strategy_name, **kwargs):
# Create a new replay record
new_replay = Replay(
timestamp=datetime.utcnow().timestamp(),
strategy_name=strategy_name,
strategy_args=json.dumps(kwargs)
)
crud.session.add(new_replay)
crud.session.commit()
# Store the replay_id globally
GlobalState.replay_id = new_replay.id
# Run the strategy
run_strategy(strategy_name)
def run_strategy(strategy_name):
strategy = get_strategy(strategy_name)
strategy.run()
For the decorators (logging and caching), you can modify them to fetch the replay_id
from the global state or the context manager, depending on the approach used:
from openadapt.global_state import GlobalState
def log_execution(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
start_time = time.time()
result = fn(*args, **kwargs)
duration = time.time() - start_time
log_entry = {
"replay_id": GlobalState.replay_id,
"function_name": fn.__name__,
"input_args": args,
"input_kwargs": kwargs,
"output": result,
"execution_time": duration
}
crud.log_function_call(log_entry)
return result
return wrapper
By using these approaches, the replay ID is handled automatically, eliminating the need for manual setting by the developer, and ensuring that all associated actions are properly linked to the replay session. This approach keeps the execution and logging processes clean and streamlined.
Let's also implement basic functionality for displaying in the dashboard.
Feature request
We would like to display replays in the dashboard, and all associated data.
This involves instrumenting replays to record all data, e.g. segmentations, prompts.
Probably the easiest way to do this is at the
adapter
level. This may also involve callingrecord
fromreplay
.Motivation