lgabs / dialog

Dialog is a production-ready Q&A Application with LLMs, using Langchain and LangServe.
MIT License
3 stars 0 forks source link

use langchain_postgres for memory as well #7

Open lgabs opened 5 months ago

lgabs commented 5 months ago

langchain_postgres seams to be the new recommended integration for using postgres with langchain. We already use it for the retriever, with pgvecor integration, but not for memory (indeed, the langchain's docs about it still use the implementation from langchain_community). It's probably possible to use for memory as well, as shown at their README here.

https://github.com/lgabs/dialog/blob/ad000d3e9cf2565a5a8daccb34d7f272ce71924e/src/dialog/memory.py#L16-L20

lgabs commented 5 months ago

I've made some tests in the try-langchain-postgres-lib-memory branch, but I think it's not fully integrated for usage with RunnableWithMessageHistory. For the sync connection implemented in the branch, it throws this error (trying to use async also throws another error):

api-1  | Traceback (most recent call last):
api-1  |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 281, in __call__
api-1  |     await wrap(partial(self.listen_for_disconnect, receive))
api-1  |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 270, in wrap
api-1  |     await func()
api-1  |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 221, in listen_for_disconnect
api-1  |     message = await receive()
api-1  |               ^^^^^^^^^^^^^^^
api-1  |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 568, in receive
api-1  |     await self.message_event.wait()
api-1  |   File "/usr/local/lib/python3.11/asyncio/locks.py", line 213, in wait
api-1  |     await fut
api-1  | asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f92131b8750
api-1  |
api-1  | During handling of the above exception, another exception occurred:
api-1  |
api-1  |   + Exception Group Traceback (most recent call last):
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 411, in run_asgi
api-1  |   |     result = await app(  # type: ignore[func-returns-value]
api-1  |   |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 69, in __call__
api-1  |   |     return await self.app(scope, receive, send)
api-1  |   |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
api-1  |   |     await super().__call__(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
api-1  |   |     await self.middleware_stack(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
api-1  |   |     raise exc
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
api-1  |   |     await self.app(scope, receive, _send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 91, in __call__
api-1  |   |     await self.simple_response(scope, receive, send, request_headers=headers)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 146, in simple_response
api-1  |   |     await self.app(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
api-1  |   |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
api-1  |   |     raise exc
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
api-1  |   |     await app(scope, receive, sender)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 758, in __call__
api-1  |   |     await self.middleware_stack(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 778, in app
api-1  |   |     await route.handle(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 299, in handle
api-1  |   |     await self.app(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 79, in app
api-1  |   |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
api-1  |   |     raise exc
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
api-1  |   |     await app(scope, receive, sender)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
api-1  |   |     await response(scope, receive, send)
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 267, in __call__
api-1  |   |     async with anyio.create_task_group() as task_group:
api-1  |   |   File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
api-1  |   |     raise BaseExceptionGroup(
api-1  |   | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
api-1  |   +-+---------------- 1 ----------------
api-1  |     | Traceback (most recent call last):
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 270, in wrap
api-1  |     |     await func()
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/sse_starlette/sse.py", line 251, in stream_response
api-1  |     |     async for data in self.body_iterator:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langserve/api_handler.py", line 1077, in _stream_log
api-1  |     |     async for chunk in self._runnable.astream_log(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 914, in astream_log
api-1  |     |     async for item in _astream_log_implementation(  # type: ignore
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 616, in _astream_log_implementation
api-1  |     |     await task
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 570, in consume_astream
api-1  |     |     async for chunk in runnable.astream(input, config, **kwargs):
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4698, in astream
api-1  |     |     async for item in self.bound.astream(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4698, in astream
api-1  |     |     async for item in self.bound.astream(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4698, in astream
api-1  |     |     async for item in self.bound.astream(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2900, in astream
api-1  |     |     async for chunk in self.atransform(input_aiter(), config, **kwargs):
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2883, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2853, in _atransform
api-1  |     |     async for output in final_pipeline:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4734, in atransform
api-1  |     |     async for item in self.bound.atransform(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2883, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2853, in _atransform
api-1  |     |     async for output in final_pipeline:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/output_parsers/transform.py", line 60, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1943, in _atransform_stream_with_config
api-1  |     |     final_input: Optional[Input] = await py_anext(input_for_tracing, None)
api-1  |     |                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 62, in anext_impl
api-1  |     |     return await __anext__(iterator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1316, in atransform
api-1  |     |     async for chunk in input:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1316, in atransform
api-1  |     |     async for chunk in input:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3317, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1943, in _atransform_stream_with_config
api-1  |     |     final_input: Optional[Input] = await py_anext(input_for_tracing, None)
api-1  |     |                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 62, in anext_impl
api-1  |     |     return await __anext__(iterator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3317, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3304, in _atransform
api-1  |     |     chunk = AddableDict({step_name: task.result()})
api-1  |     |                                     ^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3287, in get_next_chunk
api-1  |     |     return await py_anext(generator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2883, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 2853, in _atransform
api-1  |     |     async for output in final_pipeline:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/output_parsers/transform.py", line 60, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1943, in _atransform_stream_with_config
api-1  |     |     final_input: Optional[Input] = await py_anext(input_for_tracing, None)
api-1  |     |                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 62, in anext_impl
api-1  |     |     return await __anext__(iterator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1316, in atransform
api-1  |     |     async for chunk in input:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1316, in atransform
api-1  |     |     async for chunk in input:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/passthrough.py", line 586, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/passthrough.py", line 565, in _atransform
api-1  |     |     async for chunk in for_passthrough:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 97, in tee_peer
api-1  |     |     item = await iterator.__anext__()
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   [Previous line repeated 3 more times]
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4734, in atransform
api-1  |     |     async for item in self.bound.atransform(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/passthrough.py", line 586, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/passthrough.py", line 576, in _atransform
api-1  |     |     yield await first_map_chunk_task
api-1  |     |           ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/utils/aiter.py", line 62, in anext_impl
api-1  |     |     return await __anext__(iterator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3317, in atransform
api-1  |     |     async for chunk in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3304, in _atransform
api-1  |     |     chunk = AddableDict({step_name: task.result()})
api-1  |     |                                     ^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 3287, in get_next_chunk
api-1  |     |     return await py_anext(generator)
api-1  |     |            ^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4734, in atransform
api-1  |     |     async for item in self.bound.atransform(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4166, in atransform
api-1  |     |     async for output in self._atransform_stream_with_config(
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 1979, in _atransform_stream_with_config
api-1  |     |     chunk: Output = await asyncio.create_task(  # type: ignore[call-arg]
api-1  |     |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/tracers/log_stream.py", line 237, in tap_output_aiter
api-1  |     |     async for chunk in output:
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/base.py", line 4135, in _atransform
api-1  |     |     output = await acall_func_with_variable_args(
api-1  |     |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_core/runnables/history.py", line 420, in _aenter_history
api-1  |     |     messages = (await hist.aget_messages()).copy()
api-1  |     |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
api-1  |     |   File "/usr/local/lib/python3.11/site-packages/langchain_postgres/chat_message_histories.py", line 330, in aget_messages
api-1  |     |     raise ValueError(
api-1  |     | ValueError: Please initialize the PostgresChatMessageHistory with an async connection or use the sync get_messages method instead.