This PR adds support for asynchronous connection pooling in the PostgresChatMessageHistory class, addressing issues #122 and #129
Changes Made:
Modified PostgresChatMessageHistory:
Added a conn_pool parameter to accept an AsyncConnectionPool instance.
Adjusted the __init__ method to include conn_pool and reordered parameters for improved usability.
Ensured session_id and table_name can be passed as keyword arguments.
Updated Asynchronous Methods:
Modified aget_messages and aadd_messages to utilize the connection pool when provided.
Maintained existing functionality for async_connection to ensure backward compatibility.
Added Unit Tests:
Introduced test_async_chat_history_with_pool in tests/unit_tests/test_chat_histories.py to verify the new functionality.
Updated Documentation:
Revised the README to include examples of using PostgresChatMessageHistory with an asynchronous connection pool.
Adjusted usage examples to reflect the updated parameter order and new conn_pool parameter.
Example Usage:
import uuid
import asyncio
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage
from langchain_postgres import PostgresChatMessageHistory
from psycopg_pool import AsyncConnectionPool
async def main():
# Database connection string
conn_info = "postgresql://user:password@host:port/dbname" # Replace with your connection info
# Initialize the connection pool
pool = AsyncConnectionPool(conninfo=conn_info)
try:
# Create the table schema (only needs to be done once)
async with pool.connection() as async_connection:
table_name = "chat_history"
await PostgresChatMessageHistory.adrop_table(async_connection, table_name)
await PostgresChatMessageHistory.acreate_tables(async_connection, table_name)
session_id = str(uuid.uuid4())
# Initialize the chat history manager with the connection pool
chat_history = PostgresChatMessageHistory(
session_id=session_id,
table_name=table_name,
conn_pool=pool
)
# Add messages to the chat history asynchronously
await chat_history.aadd_messages([
SystemMessage(content="System message"),
AIMessage(content="AI response"),
HumanMessage(content="Human message"),
])
# Retrieve messages from the chat history
messages = await chat_history.aget_messages()
print(messages)
finally:
# Close the connection pool
await pool.close()
# Run the async main function
asyncio.run(main())
Testing:
Added a new test test_async_chat_history_with_pool in tests/unit_tests/test_chat_histories.py:
async def test_async_chat_history_with_pool() -> None:
"""Test the async chat history using a connection pool."""
from psycopg_pool import AsyncConnectionPool
from tests.utils import DSN
# Initialize the connection pool
pool = AsyncConnectionPool(conninfo=DSN)
try:
table_name = "chat_history"
session_id = str(uuid.uuid4())
# Create tables using a connection from the pool
async with pool.connection() as async_connection:
await PostgresChatMessageHistory.adrop_table(async_connection, table_name)
await PostgresChatMessageHistory.acreate_tables(async_connection, table_name)
# Create PostgresChatMessageHistory with conn_pool
chat_history = PostgresChatMessageHistory(
session_id=session_id,
table_name=table_name,
conn_pool=pool,
)
# Ensure the chat history is empty
messages = await chat_history.aget_messages()
assert messages == []
# Add messages to the chat history
await chat_history.aadd_messages(
[
SystemMessage(content="System message"),
AIMessage(content="AI response"),
HumanMessage(content="Human message"),
]
)
# Retrieve messages from the chat history
messages = await chat_history.aget_messages()
assert len(messages) == 3
assert messages == [
SystemMessage(content="System message"),
AIMessage(content="AI response"),
HumanMessage(content="Human message"),
]
# Clear the chat history
await chat_history.aclear()
messages = await chat_history.aget_messages()
assert messages == []
finally:
# Close the connection pool
await pool.close()
Ensured all existing tests pass, maintaining backward compatibility.
Documentation:
README Updates:
Adjusted parameter usage in examples to match the updated __init__ method.
Added a new section demonstrating asynchronous usage with connection pooling.
Notes:
Backward Compatibility:
Existing code using sync_connection or async_connection continues to work without modifications.
Benefits:
Improves efficiency by reusing database connections through a connection pool.
Enhances resource management in asynchronous applications.
This PR adds support for asynchronous connection pooling in the
PostgresChatMessageHistory
class, addressing issues #122 and #129Changes Made:
PostgresChatMessageHistory
:conn_pool
parameter to accept anAsyncConnectionPool
instance.__init__
method to includeconn_pool
and reordered parameters for improved usability.session_id
andtable_name
can be passed as keyword arguments.aget_messages
andaadd_messages
to utilize the connection pool when provided.async_connection
to ensure backward compatibility.test_async_chat_history_with_pool
intests/unit_tests/test_chat_histories.py
to verify the new functionality.PostgresChatMessageHistory
with an asynchronous connection pool.conn_pool
parameter.Example Usage:
Testing:
Added a new test
test_async_chat_history_with_pool
intests/unit_tests/test_chat_histories.py
:Ensured all existing tests pass, maintaining backward compatibility.
Documentation:
__init__
method.Notes:
sync_connection
orasync_connection
continues to work without modifications.Related Issues:
122
129