NVIDIA / NeMo-Guardrails

NeMo Guardrails is an open-source toolkit for easily adding programmable guardrails to LLM-based conversational systems.
Other
4.2k stars 399 forks source link

LLMCallException due to event loop issue #788

Open ishaan-mehta opened 1 month ago

ishaan-mehta commented 1 month ago

I am consistently seeing this error when I try to invoke my LangChain chain which includes RunnableRails inside of an MLflow run:

LLMCallException: LLM Call Exception: Connection error.

It happens whenever I invoke the model quickly in succession.

I am running inside a Databricks notebook (DBR 13.3 ML, 14.3 ML, 15.2 ML all don't work — haven't tried others).

This does not occur when I remove the guardrails from the chain, so it definitely related to this package. The guardrails are also hitting the same endpoint as the other LLM calls in the chain, so it is not an error on the LLM endpoint side. Also note that the root cause here is an event loop issue.

Here is my Python environment:

absl-py==1.0.0
accelerate==0.20.3
aiohappyeyeballs==2.4.3
aiohttp==3.10.8
aiosignal==1.3.1
alembic==1.13.3
aniso8601==9.0.1
annoy==1.17.3
anyio==3.5.0
appdirs==1.4.4
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
astor==0.8.1
asttokens==2.2.1
astunparse==1.6.3
async-timeout==4.0.3
attrs==21.4.0
audioread==3.0.0
azure-core==1.29.1
azure-cosmos==4.6.0
azure-storage-blob==12.17.0
azure-storage-file-datalake==12.12.0
backcall==0.2.0
bcrypt==3.2.0
beautifulsoup4==4.11.1
black==22.6.0
bleach==4.1.0
blinker==1.4
blis==0.7.10
boto3==1.24.28
botocore==1.27.28
cachetools==5.5.0
catalogue==2.0.9
category-encoders==2.6.1
certifi==2022.9.14
cffi==1.15.1
chardet==4.0.0
charset-normalizer==2.0.4
click==8.0.4
cloudpathlib==0.19.0
cloudpickle==2.2.1
cmdstanpy==1.1.0
coloredlogs==15.0.1
confection==0.1.1
configparser==5.2.0
convertdate==2.4.0
cryptography==37.0.1
cycler==0.11.0
cymem==2.0.7
Cython==0.29.32
dacite==1.8.1
databricks-automl-runtime==0.2.17
databricks-cli==0.17.7
databricks-feature-engineering==0.1.2
databricks-feature-store==0.14.3
databricks-sdk==0.20.0
databricks-sql-connector==3.4.0
dataclasses-json==0.5.14
datasets==2.13.1
dbl-tempo==0.1.26
dbus-python==1.2.18
debugpy==1.6.0
decorator==5.1.1
defusedxml==0.7.1
Deprecated==1.2.14
dill==0.3.4
diskcache==5.6.1
distlib==0.3.7
distro==1.7.0
distro-info==1.1+ubuntu0.2
docker==7.1.0
docstring-to-markdown==0.12
en-core-web-sm @ file:///local_disk0/tmp/addedFile8b6fd3631f1b472a867ede184c38a5687532097936132996654/en_core_web_sm-3.7.1-py3-none-any.whl
entrypoints==0.4
ephem==4.1.4
et-xmlfile==1.1.0
evaluate==0.4.0
executing==1.2.0
facets-overview==1.0.3
faiss-cpu==1.8.0.post1
fastapi==0.110.3
fastembed==0.3.6
fastjsonschema==2.18.0
fasttext==0.9.2
filelock==3.6.0
Flask @ https://databricks-build-artifacts-manual-staging.s3.amazonaws.com/flask/Flask-1.1.2%2Bdb1-py2.py3-none-any.whl?AWSAccessKeyId=AKIAX7HWM34HCSVHYQ7M&Expires=2001354391&Signature=bztIumr2jXFbisF0QicZvqbvT9s%3D
flatbuffers==23.5.26
fonttools==4.25.0
frozenlist==1.4.0
fsspec==2024.9.0
future==0.18.2
gast==0.4.0
gitdb==4.0.10
GitPython==3.1.27
google-api-core==2.8.2
google-auth==2.35.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.3.3
google-cloud-storage==2.10.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.5.0
googleapis-common-protos==1.56.4
graphene==3.3
graphql-core==3.2.4
graphql-relay==3.2.0
greenlet==1.1.1
grpcio==1.48.1
grpcio-status==1.48.1
gunicorn==20.1.0
gviz-api==1.10.0
h11==0.14.0
h5py==3.7.0
holidays==0.27.1
horovod==0.28.1
htmlmin==0.1.12
httpcore==1.0.6
httplib2==0.20.2
httptools==0.6.0
httpx==0.27.2
huggingface-hub==0.25.1
humanfriendly==10.0
idna==3.3
ImageHash==4.3.1
imbalanced-learn==0.10.1
importlib-resources==6.0.1
importlib_metadata==8.4.0
ipykernel==6.17.1
ipython==8.10.0
ipython-genutils==0.2.0
ipywidgets==7.7.2
isodate==0.6.1
itsdangerous==2.0.1
jedi==0.18.1
jeepney==0.7.1
Jinja2==3.1.4
jiter==0.5.0
jmespath==0.10.0
joblib==1.2.0
joblibspark==0.5.1
jsonpatch==1.33
jsonpointer==3.0.0
jsonschema==4.16.0
jupyter-client==7.3.4
jupyter_core==4.11.2
jupyterlab-pygments==0.1.2
jupyterlab-widgets==1.0.0
keras==2.11.0
keyring==23.5.0
kiwisolver==1.4.2
langchain==0.2.16
langchain-community==0.2.17
langchain-core==0.2.41
langchain-openai==0.1.25
langchain-text-splitters==0.2.4
langchainplus-sdk==0.0.20
langcodes==3.3.0
langsmith==0.1.129
lark==1.1.9
launchpadlib==1.10.16
lazr.restfulclient==0.14.4
lazr.uri==1.0.6
lazy_loader==0.3
libclang==15.0.6.1
librosa==0.10.0
lightgbm==3.3.5
llvmlite==0.38.0
loguru==0.7.2
LunarCalendar==0.0.9
lz4==4.3.3
Mako==1.2.0
Markdown==3.3.4
markdown-it-py==3.0.0
MarkupSafe==2.0.1
marshmallow==3.20.1
matplotlib==3.5.2
matplotlib-inline==0.1.6
mccabe==0.7.0
mdurl==0.1.2
mistune==0.8.4
mleap==0.20.0
mlflow==2.16.2
mlflow-skinny==2.16.2
mmh3==4.1.0
more-itertools==8.10.0
mpmath==1.3.0
msgpack==1.0.5
multidict==6.0.4
multimethod==1.9.1
multiprocess==0.70.12.2
murmurhash==1.0.9
mypy-extensions==0.4.3
nbclient==0.5.13
nbconvert==6.4.4
nbformat==5.5.0
nemoguardrails==0.10.0
nest-asyncio==1.6.0
networkx==2.8.4
ninja==1.11.1
nltk==3.9.1
nodeenv==1.8.0
notebook==6.4.12
numba==0.55.1
numexpr==2.8.4
numpy==1.24.4
oauthlib==3.2.0
onnx==1.16.2
onnxruntime==1.19.2
openai==1.51.0
openapi-schema-pydantic==1.2.4
openpyxl==3.1.5
opentelemetry-api==1.27.0
opentelemetry-sdk==1.27.0
opentelemetry-semantic-conventions==0.48b0
opt-einsum==3.3.0
orjson==3.10.7
packaging==24.1
pandas==1.4.4
pandocfilters==1.5.0
paramiko==2.9.2
parso==0.8.3
pathspec==0.9.0
pathy==0.10.2
patsy==0.5.2
petastorm==0.12.1
pexpect==4.8.0
phik==0.12.3
pickleshare==0.7.5
pillow==10.4.0
platformdirs==2.5.2
plotly==5.9.0
pluggy==1.0.0
pmdarima==2.0.3
pooch==1.7.0
preshed==3.0.8
prometheus-client==0.14.1
prompt-toolkit==3.0.36
prophet==1.1.4
protobuf==4.25.5
psutil==5.9.0
psycopg2==2.9.3
ptyprocess==0.7.0
pure-eval==0.2.2
pyarrow==16.1.0
pyarrow-hotfix==0.5
pyasn1==0.4.8
pyasn1-modules==0.2.8
pybind11==2.11.1
pycparser==2.21
pydantic==1.10.6
pyflakes==3.0.1
Pygments==2.18.0
PyGObject==3.42.1
PyJWT==2.3.0
PyMeeus==0.5.12
PyNaCl==1.5.0
pyodbc==4.0.32
pyparsing==3.0.9
pyright==1.1.294
pyrsistent==0.18.0
PyStemmer==2.2.0.1
pytesseract==0.3.10
python-apt==2.4.0+ubuntu3
python-dateutil==2.8.2
python-dotenv==1.0.0
python-editor==1.0.4
python-lsp-jsonrpc==1.0.0
python-lsp-server==1.7.1
pytoolconfig==1.2.2
pytz==2022.1
PyWavelets==1.3.0
PyYAML==6.0
pyzmq==23.2.0
rank-bm25==0.2.2
regex==2022.7.9
requests==2.32.3
requests-oauthlib==1.3.1
responses==0.18.0
rich==13.9.1
rope==1.7.0
rsa==4.9
s3transfer==0.6.0
safetensors==0.3.2
scikit-learn==1.1.1
scipy==1.9.1
seaborn==0.11.2
SecretStorage==3.3.1
Send2Trash==1.8.0
sentence-transformers==2.2.2
sentencepiece==0.1.99
shap==0.41.0
simpleeval==0.9.13
simplejson==3.17.6
six==1.16.0
slicer==0.0.7
smart-open==5.2.1
smmap==5.0.0
sniffio==1.2.0
snowballstemmer==2.2.0
soundfile==0.12.1
soupsieve==2.3.1
soxr==0.3.6
spacy==3.7.5
spacy-legacy==3.0.12
spacy-loggers==1.0.4
spark-tensorflow-distributor==1.0.0
SQLAlchemy==2.0.0
sqlglot==25.24.1
sqlparse==0.4.2
srsly==2.4.7
ssh-import-id==5.11
stack-data==0.6.2
starlette==0.37.2
statsmodels==0.13.2
sympy==1.13.3
tabulate==0.8.10
tangled-up-in-unicode==0.2.0
tenacity==8.1.0
tensorboard==2.11.0
tensorboard-data-server==0.6.1
tensorboard-plugin-profile==2.11.2
tensorboard-plugin-wit==1.8.1
tensorflow-cpu==2.11.1
tensorflow-estimator==2.11.0
tensorflow-io-gcs-filesystem==0.33.0
termcolor==2.3.0
terminado==0.13.1
testpath==0.6.0
thinc==8.2.5
threadpoolctl==2.2.0
thrift==0.20.0
tiktoken==0.7.0
tokenize-rt==4.2.1
tokenizers==0.20.0
tomli==2.0.1
torch==1.13.1+cpu
torchvision==0.14.1+cpu
tornado==6.1
tqdm==4.66.5
traitlets==5.1.1
transformers==4.30.2
typeguard==2.13.3
typer==0.7.0
typing-inspect==0.9.0
typing_extensions==4.12.2
ujson==5.4.0
unattended-upgrades==0.1
urllib3==1.26.11
uvicorn==0.23.2
uvloop==0.17.0
virtualenv==20.16.3
visions==0.7.5
wadllib==1.3.6
wasabi==1.1.2
watchdog==5.0.3
watchfiles==0.19.0
wcwidth==0.2.5
weasel==0.4.1
webencodings==0.5.1
websocket-client==0.58.0
websockets==11.0.3
Werkzeug==2.0.3
whatthepatch==1.0.2
widgetsnbextension==3.6.1
wordcloud==1.9.2
wrapt==1.14.1
xgboost==1.7.6
xxhash==3.3.0
yapf==0.31.0
yarl==1.13.1
ydata-profiling==4.2.0
zipp==3.8.0

Full stack trace:

2024/09/30 22:09:02 INFO mlflow.tracking._tracking_service.client: 🏃 View run persistent-hound-147 at: https://eastus-c3.azuredatabricks.net/ml/experiments/936693136200058/runs/9989926bc6f84aefbbe824249cbd90cc.
2024/09/30 22:09:02 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: https://eastus-c3.azuredatabricks.net/ml/experiments/936693136200058.
LLMCallException: LLM Call Exception: Connection error.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/openai/_base_client.py:1564, in AsyncAPIClient._request(self, cast_to, options, stream, stream_cls, retries_taken)
   1563 try:
-> 1564     response = await self._client.send(
   1565         request,
   1566         stream=stream or self._should_stream_response_body(request=request),
   1567         **kwargs,
   1568     )
   1569 except httpx.TimeoutException as err:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpx/_client.py:1674, in AsyncClient.send(self, request, stream, auth, follow_redirects)
   1672 auth = self._build_request_auth(request, auth)
-> 1674 response = await self._send_handling_auth(
   1675     request,
   1676     auth=auth,
   1677     follow_redirects=follow_redirects,
   1678     history=[],
   1679 )
   1680 try:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpx/_client.py:1702, in AsyncClient._send_handling_auth(self, request, auth, follow_redirects, history)
   1701 while True:
-> 1702     response = await self._send_handling_redirects(
   1703         request,
   1704         follow_redirects=follow_redirects,
   1705         history=history,
   1706     )
   1707     try:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpx/_client.py:1739, in AsyncClient._send_handling_redirects(self, request, follow_redirects, history)
   1737     await hook(request)
-> 1739 response = await self._send_single_request(request)
   1740 try:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpx/_client.py:1776, in AsyncClient._send_single_request(self, request)
   1775 with request_context(request=request):
-> 1776     response = await transport.handle_async_request(request)
   1778 assert isinstance(response.stream, AsyncByteStream)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpx/_transports/default.py:377, in AsyncHTTPTransport.handle_async_request(self, request)
    376 with map_httpcore_exceptions():
--> 377     resp = await self._pool.handle_async_request(req)
    379 assert isinstance(resp.stream, typing.AsyncIterable)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:216, in AsyncConnectionPool.handle_async_request(self, request)
    215     await self._close_connections(closing)
--> 216     raise exc from None
    218 # Return the response. Note that in this case we still have to manage
    219 # the point at which the response is closed.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:196, in AsyncConnectionPool.handle_async_request(self, request)
    194 try:
    195     # Send the request on the assigned connection.
--> 196     response = await connection.handle_async_request(
    197         pool_request.request
    198     )
    199 except ConnectionNotAvailable:
    200     # In some cases a connection may initially be available to
    201     # handle a request, but then become unavailable.
    202     #
    203     # In this case we clear the connection and try again.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/connection.py:101, in AsyncHTTPConnection.handle_async_request(self, request)
     99     raise exc
--> 101 return await self._connection.handle_async_request(request)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/http11.py:143, in AsyncHTTP11Connection.handle_async_request(self, request)
    142         await self._response_closed()
--> 143 raise exc
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/http11.py:113, in AsyncHTTP11Connection.handle_async_request(self, request)
    104 async with Trace(
    105     "receive_response_headers", logger, request, kwargs
    106 ) as trace:
    107     (
    108         http_version,
    109         status,
    110         reason_phrase,
    111         headers,
    112         trailing_data,
--> 113     ) = await self._receive_response_headers(**kwargs)
    114     trace.return_value = (
    115         http_version,
    116         status,
    117         reason_phrase,
    118         headers,
    119     )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/http11.py:186, in AsyncHTTP11Connection._receive_response_headers(self, request)
    185 while True:
--> 186     event = await self._receive_event(timeout=timeout)
    187     if isinstance(event, h11.Response):
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_async/http11.py:224, in AsyncHTTP11Connection._receive_event(self, timeout)
    223 if event is h11.NEED_DATA:
--> 224     data = await self._network_stream.read(
    225         self.READ_NUM_BYTES, timeout=timeout
    226     )
    228     # If we feed this case through h11 we'll raise an exception like:
    229     #
    230     #     httpcore.RemoteProtocolError: can't handle event type
   (...)
    234     # perspective. Instead we handle this case distinctly and treat
    235     # it as a ConnectError.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/httpcore/_backends/anyio.py:35, in AnyIOStream.read(self, max_bytes, timeout)
     34 try:
---> 35     return await self._stream.receive(max_bytes=max_bytes)
     36 except anyio.EndOfStream:  # pragma: nocover
File /databricks/python/lib/python3.10/site-packages/anyio/streams/tls.py:171, in TLSStream.receive(self, max_bytes)
    170 async def receive(self, max_bytes: int = 65536) -> bytes:
--> 171     data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
    172     if not data:
File /databricks/python/lib/python3.10/site-packages/anyio/streams/tls.py:115, in TLSStream._call_sslobject_method(self, func, *args)
    113         await self.transport_stream.send(self._write_bio.read())
--> 115     data = await self.transport_stream.receive()
    116 except EndOfStream:
File /databricks/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:1105, in SocketStream.receive(self, max_bytes)
   1104 self._transport.resume_reading()
-> 1105 await self._protocol.read_event.wait()
   1106 self._transport.pause_reading()
File /usr/lib/python3.10/asyncio/locks.py:211, in Event.wait(self)
    209     return True
--> 211 fut = self._get_loop().create_future()
    212 self._waiters.append(fut)
File /usr/lib/python3.10/asyncio/mixins.py:30, in _LoopBoundMixin._get_loop(self)
     29 if loop is not self._loop:
---> 30     raise RuntimeError(f'{self!r} is bound to a different event loop')
     31 return loop
RuntimeError: <asyncio.locks.Event object at 0x7ff0a08634c0 [unset]> is bound to a different event loop

The above exception was the direct cause of the following exception:
APIConnectionError                        Traceback (most recent call last)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/actions/llm/utils.py:92, in llm_call(llm, prompt, stop, custom_callback_handlers)
     91 try:
---> 92     result = await llm.agenerate_prompt(
     93         [StringPromptValue(text=prompt)], callbacks=all_callbacks, stop=stop
     94     )
     95 except Exception as e:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:787, in BaseChatModel.agenerate_prompt(self, prompts, stop, callbacks, **kwargs)
    786 prompt_messages = [p.to_messages() for p in prompts]
--> 787 return await self.agenerate(
    788     prompt_messages, stop=stop, callbacks=callbacks, **kwargs
    789 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:747, in BaseChatModel.agenerate(self, messages, stop, callbacks, tags, metadata, run_name, run_id, **kwargs)
    735         await asyncio.gather(
    736             *[
    737                 run_manager.on_llm_end(
   (...)
    745             ]
    746         )
--> 747     raise exceptions[0]
    748 flattened_outputs = [
    749     LLMResult(generations=[res.generations], llm_output=res.llm_output)  # type: ignore[list-item, union-attr]
    750     for res in results
    751 ]
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/language_models/chat_models.py:923, in BaseChatModel._agenerate_with_cache(self, messages, stop, run_manager, **kwargs)
    922 if inspect.signature(self._agenerate).parameters.get("run_manager"):
--> 923     result = await self._agenerate(
    924         messages, stop=stop, run_manager=run_manager, **kwargs
    925     )
    926 else:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_openai/chat_models/base.py:843, in BaseChatOpenAI._agenerate(self, messages, stop, run_manager, **kwargs)
    842 else:
--> 843     response = await self.async_client.create(**payload)
    844 return await run_in_executor(
    845     None, self._create_chat_result, response, generation_info
    846 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/openai/resources/chat/completions.py:1412, in AsyncCompletions.create(self, messages, model, frequency_penalty, function_call, functions, logit_bias, logprobs, max_completion_tokens, max_tokens, n, parallel_tool_calls, presence_penalty, response_format, seed, service_tier, stop, stream, stream_options, temperature, tool_choice, tools, top_logprobs, top_p, user, extra_headers, extra_query, extra_body, timeout)
   1411 validate_response_format(response_format)
-> 1412 return await self._post(
   1413     "/chat/completions",
   1414     body=await async_maybe_transform(
   1415         {
   1416             "messages": messages,
   1417             "model": model,
   1418             "frequency_penalty": frequency_penalty,
   1419             "function_call": function_call,
   1420             "functions": functions,
   1421             "logit_bias": logit_bias,
   1422             "logprobs": logprobs,
   1423             "max_completion_tokens": max_completion_tokens,
   1424             "max_tokens": max_tokens,
   1425             "n": n,
   1426             "parallel_tool_calls": parallel_tool_calls,
   1427             "presence_penalty": presence_penalty,
   1428             "response_format": response_format,
   1429             "seed": seed,
   1430             "service_tier": service_tier,
   1431             "stop": stop,
   1432             "stream": stream,
   1433             "stream_options": stream_options,
   1434             "temperature": temperature,
   1435             "tool_choice": tool_choice,
   1436             "tools": tools,
   1437             "top_logprobs": top_logprobs,
   1438             "top_p": top_p,
   1439             "user": user,
   1440         },
   1441         completion_create_params.CompletionCreateParams,
   1442     ),
   1443     options=make_request_options(
   1444         extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
   1445     ),
   1446     cast_to=ChatCompletion,
   1447     stream=stream or False,
   1448     stream_cls=AsyncStream[ChatCompletionChunk],
   1449 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/openai/_base_client.py:1831, in AsyncAPIClient.post(self, path, cast_to, body, files, options, stream, stream_cls)
   1828 opts = FinalRequestOptions.construct(
   1829     method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options
   1830 )
-> 1831 return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/openai/_base_client.py:1525, in AsyncAPIClient.request(self, cast_to, options, stream, stream_cls, remaining_retries)
   1523     retries_taken = 0
-> 1525 return await self._request(
   1526     cast_to=cast_to,
   1527     options=options,
   1528     stream=stream,
   1529     stream_cls=stream_cls,
   1530     retries_taken=retries_taken,
   1531 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/openai/_base_client.py:1598, in AsyncAPIClient._request(self, cast_to, options, stream, stream_cls, retries_taken)
   1597     log.debug("Raising connection error")
-> 1598     raise APIConnectionError(request=request) from err
   1600 log.debug(
   1601     'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase
   1602 )
APIConnectionError: Connection error.

During handling of the above exception, another exception occurred:
LLMCallException                          Traceback (most recent call last)
File <command-482391140636505>, line 4
      2 input_example = {"input": "How many stores are there?"}
      3 params_example = {"user_id": "000000", "session_id": "000000"}
----> 4 output_example = managed_conversational_text_to_sql.invoke(
      5     input_example, config={"configurable": params_example}
      6 )
      7 signature = mlflow.models.signature.infer_signature(
      8     input_example, output_example, params_example
      9 )
     11 model_info = mlflow.langchain.log_model(
     12     lc_model=abspath("chain.py"),
     13     artifact_path="chain",
   (...)
     17     signature=signature,
     18 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:460, in safe_patch.<locals>.safe_patch_function(*args, **kwargs)
    441 if (
    442     active_session_failed
    443     or autologging_is_disabled(autologging_integration)
   (...)
    454     # warning behavior during original function execution, since autologging is being
    455     # skipped
    456     with set_non_mlflow_warnings_behavior_for_current_thread(
    457         disable_warnings=False,
    458         reroute_warnings=False,
    459     ):
--> 460         return original(*args, **kwargs)
    462 # Whether or not the original / underlying function has been called during the
    463 # execution of patched code
    464 original_has_been_called = False
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:2877, in RunnableSequence.invoke(self, input, config, **kwargs)
   2875 context.run(_set_config_context, config)
   2876 if i == 0:
-> 2877     input = context.run(step.invoke, input, config, **kwargs)
   2878 else:
   2879     input = context.run(step.invoke, input, config)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/passthrough.py:495, in RunnableAssign.invoke(self, input, config, **kwargs)
    489 def invoke(
    490     self,
    491     input: Dict[str, Any],
    492     config: Optional[RunnableConfig] = None,
    493     **kwargs: Any,
    494 ) -> Dict[str, Any]:
--> 495     return self._call_with_config(self._invoke, input, config, **kwargs)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:1786, in Runnable._call_with_config(self, func, input, config, run_type, serialized, **kwargs)
   1782     context = copy_context()
   1783     context.run(_set_config_context, child_config)
   1784     output = cast(
   1785         Output,
-> 1786         context.run(
   1787             call_func_with_variable_args,  # type: ignore[arg-type]
   1788             func,  # type: ignore[arg-type]
   1789             input,  # type: ignore[arg-type]
   1790             config,
   1791             run_manager,
   1792             **kwargs,
   1793         ),
   1794     )
   1795 except BaseException as e:
   1796     run_manager.on_chain_error(e)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/config.py:398, in call_func_with_variable_args(func, input, config, run_manager, **kwargs)
    396 if run_manager is not None and accepts_run_manager(func):
    397     kwargs["run_manager"] = run_manager
--> 398 return func(input, **kwargs)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/passthrough.py:482, in RunnableAssign._invoke(self, input, run_manager, config, **kwargs)
    469 def _invoke(
    470     self,
    471     input: Dict[str, Any],
   (...)
    474     **kwargs: Any,
    475 ) -> Dict[str, Any]:
    476     assert isinstance(
    477         input, dict
    478     ), "The input to RunnablePassthrough.assign() must be a dict."
    480     return {
    481         **input,
--> 482         **self.mapper.invoke(
    483             input,
    484             patch_config(config, callbacks=run_manager.get_child()),
    485             **kwargs,
    486         ),
    487     }
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:460, in safe_patch.<locals>.safe_patch_function(*args, **kwargs)
    441 if (
    442     active_session_failed
    443     or autologging_is_disabled(autologging_integration)
   (...)
    454     # warning behavior during original function execution, since autologging is being
    455     # skipped
    456     with set_non_mlflow_warnings_behavior_for_current_thread(
    457         disable_warnings=False,
    458         reroute_warnings=False,
    459     ):
--> 460         return original(*args, **kwargs)
    462 # Whether or not the original / underlying function has been called during the
    463 # execution of patched code
    464 original_has_been_called = False
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:3580, in RunnableParallel.invoke(self, input, config)
   3575     with get_executor_for_config(config) as executor:
   3576         futures = [
   3577             executor.submit(_invoke_step, step, input, config, key)
   3578             for key, step in steps.items()
   3579         ]
-> 3580         output = {key: future.result() for key, future in zip(steps, futures)}
   3581 # finish the root run
   3582 except BaseException as e:
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:3580, in <dictcomp>(.0)
   3575     with get_executor_for_config(config) as executor:
   3576         futures = [
   3577             executor.submit(_invoke_step, step, input, config, key)
   3578             for key, step in steps.items()
   3579         ]
-> 3580         output = {key: future.result() for key, future in zip(steps, futures)}
   3581 # finish the root run
   3582 except BaseException as e:
File /usr/lib/python3.10/concurrent/futures/_base.py:458, in Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()
File /usr/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None
File /usr/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:3564, in RunnableParallel.invoke.<locals>._invoke_step(step, input, config, key)
   3562 context = copy_context()
   3563 context.run(_set_config_context, child_config)
-> 3564 return context.run(
   3565     step.invoke,
   3566     input,
   3567     child_config,
   3568 )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:460, in safe_patch.<locals>.safe_patch_function(*args, **kwargs)
    441 if (
    442     active_session_failed
    443     or autologging_is_disabled(autologging_integration)
   (...)
    454     # warning behavior during original function execution, since autologging is being
    455     # skipped
    456     with set_non_mlflow_warnings_behavior_for_current_thread(
    457         disable_warnings=False,
    458         reroute_warnings=False,
    459     ):
--> 460         return original(*args, **kwargs)
    462 # Whether or not the original / underlying function has been called during the
    463 # execution of patched code
    464 original_has_been_called = False
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:4475, in RunnableLambda.invoke(self, input, config, **kwargs)
   4461 """Invoke this Runnable synchronously.
   4462 
   4463 Args:
   (...)
   4472     TypeError: If the Runnable is a coroutine function.
   4473 """
   4474 if hasattr(self, "func"):
-> 4475     return self._call_with_config(
   4476         self._invoke,
   4477         input,
   4478         self._config(config, self.func),
   4479         **kwargs,
   4480     )
   4481 else:
   4482     raise TypeError(
   4483         "Cannot invoke a coroutine function synchronously."
   4484         "Use `ainvoke` instead."
   4485     )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:1786, in Runnable._call_with_config(self, func, input, config, run_type, serialized, **kwargs)
   1782     context = copy_context()
   1783     context.run(_set_config_context, child_config)
   1784     output = cast(
   1785         Output,
-> 1786         context.run(
   1787             call_func_with_variable_args,  # type: ignore[arg-type]
   1788             func,  # type: ignore[arg-type]
   1789             input,  # type: ignore[arg-type]
   1790             config,
   1791             run_manager,
   1792             **kwargs,
   1793         ),
   1794     )
   1795 except BaseException as e:
   1796     run_manager.on_chain_error(e)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/config.py:398, in call_func_with_variable_args(func, input, config, run_manager, **kwargs)
    396 if run_manager is not None and accepts_run_manager(func):
    397     kwargs["run_manager"] = run_manager
--> 398 return func(input, **kwargs)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/base.py:4331, in RunnableLambda._invoke(self, input, run_manager, config, **kwargs)
   4329                 output = chunk
   4330 else:
-> 4331     output = call_func_with_variable_args(
   4332         self.func, input, config, run_manager, **kwargs
   4333     )
   4334 # If the output is a Runnable, invoke it
   4335 if isinstance(output, Runnable):
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/langchain_core/runnables/config.py:398, in call_func_with_variable_args(func, input, config, run_manager, **kwargs)
    396 if run_manager is not None and accepts_run_manager(func):
    397     kwargs["run_manager"] = run_manager
--> 398 return func(input, **kwargs)
File /Workspace/Repos/.internal/decd67d9d9_commits/2a7dd387d9a5ccaaaae8103e0cd0607ea51388c8/databricks/conversational_rag/chain.py:463, in check_input_rails(input)
    462 def check_input_rails(input):
--> 463     return guardrails.invoke(input)["output"]
File /Workspace/Repos/.internal/decd67d9d9_commits/2a7dd387d9a5ccaaaae8103e0cd0607ea51388c8/databricks/conversational_rag/fixtures/custom_nemoguardrails.py:19, in GapRunnableRails.invoke(self, input, config, **kwargs)
     17 """Invoke this runnable synchronously."""
     18 input_messages = self._transform_input_to_rails_format(input)
---> 19 res = self.rails.generate(
     20     messages=input_messages,
     21     options=GenerationOptions(
     22         output_vars=True,
     23         rails=["input"],
     24     ),
     25 )
     26 context = res.output_data
     27 result = res.response
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/rails/llm/llmrails.py:928, in LLMRails.generate(self, prompt, messages, return_context, options, state)
    921     raise RuntimeError(
    922         "You are using the sync `generate` inside async code. "
    923         "You should replace with `await generate_async(...)` or use `nest_asyncio.apply()`."
    924     )
    926 loop = get_or_create_event_loop()
--> 928 return loop.run_until_complete(
    929     self.generate_async(
    930         prompt=prompt,
    931         messages=messages,
    932         options=options,
    933         state=state,
    934         return_context=return_context,
    935     )
    936 )
File /usr/lib/python3.10/asyncio/base_events.py:649, in BaseEventLoop.run_until_complete(self, future)
    646 if not future.done():
    647     raise RuntimeError('Event loop stopped before Future completed.')
--> 649 return future.result()
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/rails/llm/llmrails.py:682, in LLMRails.generate_async(self, prompt, messages, options, state, streaming_handler, return_context)
    679         state_events = state["events"]
    681     # Compute the new events.
--> 682     new_events = await self.runtime.generate_events(
    683         state_events + events, processing_log=processing_log
    684     )
    685     output_state = None
    686 else:
    687     # In generation mode, by default the bot response is an instant action.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/colang/v1_0/runtime/runtime.py:167, in RuntimeV1_0.generate_events(self, events, processing_log)
    165 # If we need to execute an action, we start doing that.
    166 if last_event["type"] == "StartInternalSystemAction":
--> 167     next_events = await self._process_start_action(events)
    169 # If we need to start a flow, we parse the content and register it.
    170 elif last_event["type"] == "start_flow":
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/colang/v1_0/runtime/runtime.py:363, in RuntimeV1_0._process_start_action(self, events)
    360         kwargs["llm"] = self.registered_action_params[f"{action_name}_llm"]
    362     log.info("Executing action :: %s", action_name)
--> 363     result, status = await self.action_dispatcher.execute_action(
    364         action_name, kwargs
    365     )
    367 # If the action execution failed, we return a hardcoded message
    368 if status == "failed":
    369     # TODO: make this message configurable.
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/actions/action_dispatcher.py:253, in ActionDispatcher.execute_action(self, action_name, params)
    251 # We forward LLM Call exceptions
    252 except LLMCallException as e:
--> 253     raise e
    255 except Exception as e:
    256     filtered_params = {
    257         k: v
    258         for k, v in params.items()
    259         if k not in ["state", "events", "llm"]
    260     }
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/actions/action_dispatcher.py:214, in ActionDispatcher.execute_action(self, action_name, params)
    212 result = fn(**params)
    213 if inspect.iscoroutine(result):
--> 214     result = await result
    215 else:
    216     log.warning(
    217         f"Synchronous action `{action_name}` has been called."
    218     )
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/library/self_check/input_check/actions.py:71, in self_check_input(llm_task_manager, context, llm, config)
     66 llm_call_info_var.set(LLMCallInfo(task=task.value))
     68 with llm_params(
     69     llm, temperature=config.lowest_temperature, max_tokens=max_tokens
     70 ):
---> 71     response = await llm_call(llm, prompt, stop=stop)
     73 log.info(f"Input self-checking result is: `{response}`.")
     75 # for sake of backward compatibility
     76 # if the output_parser is not registered we will use the default one
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-4f603f53-d7e5-4da4-b917-276cfef07b06/lib/python3.10/site-packages/nemoguardrails/actions/llm/utils.py:96, in llm_call(llm, prompt, stop, custom_callback_handlers)
     92     result = await llm.agenerate_prompt(
     93         [StringPromptValue(text=prompt)], callbacks=all_callbacks, stop=stop
     94     )
     95 except Exception as e:
---> 96     raise LLMCallException(e)
     97 llm_call_info.raw_response = result.llm_output
     99 # TODO: error handling
Pouyanpi commented 3 weeks ago

@ishaan-mehta thanks for opening this issue. Have you used nest_asyncio?

In environments that already have an active event loop (like Databricks or Jupyter), attempting to create a new event loop can lead to conflicts. The nest_asyncio allows you to nest event loops safely.

Before invoking any asynchronous functions or methods, apply nest_asyncio. For example modify your main execution script as follows:

import nest_asyncio

nest_asyncio.apply()
Pouyanpi commented 2 weeks ago

@ishaan-mehta any update on this issue?

ishaan-mehta commented 1 week ago

Hi @Pouyanpi, thanks for the response — unfortunately, this issue is not solved by running nest_asyncio.apply(). It seems the issue lies with nemoguardrails compatibility with openai somewhere between openai versions 1.45.0 and 1.54.3 (both of these are working, but I find that 1.50.0, for example, is not).