Closed ArtyomZemlyak closed 6 months ago
Code for clients:
import json
from time import time
import requests
class EventStream:
"""Accepts lines of text and decodes it into a stream of SSE events.
Refer to the following page for details:
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
This class is supposed to be iterated with a for loop like:
>>> for event in EventStream(lines):
... do_something_with(event)
"""
def __init__(self, lines, encoding="utf-8"):
self._lines = lines
self._encoding = encoding
@property
def decoded_lines(self):
for line in self._lines:
yield line.decode(self._encoding)
def __iter__(self):
return self
def __next__(self):
return Event.parse_from_lines(self.decoded_lines)
class Event:
"""A single event in the event stream."""
def __init__(self):
self.id = None
self.event = None
self.data = ""
def append_line(self, line):
if not line:
raise ValueError(
"Not supposed to accept empty lines. Please handle this outside of the Event class."
)
if ":" not in line:
raise ValueError("Bad format: Each line must contain `:`.")
parts = line.split(":", maxsplit=1)
if len(parts) < 2:
raise ValueError(
"Bad format: Each line must could be splitted into two parts by ':'."
)
prefix = parts[0]
data = parts[1].strip()
if prefix == "id":
if self.id is not None:
raise ValueError(
"Bad event: event id cannot be specified multiple times."
)
self.event = data
if prefix == "event":
if self.event is not None:
raise ValueError(
"Bad event: event type cannot be specified multiple times."
)
self.event = data
if prefix == "data":
if not self.data:
self.data = data
else:
self.data = "\n".join((self.data, data))
# TODO: Handle other prefixes here
@staticmethod
def parse_from_lines(lines_stream):
"""Given a lines stream, parse an event from it.
It only parse the first event. The remainder are not touched.
"""
result = Event()
for line in lines_stream:
if not line:
return result
else:
result.append_line(line)
# If we reached the end of the input lines stream,
# raise StopIteration to indicate that no more events will happen
raise StopIteration()
def __str__(self):
# Defaults to "message" when event name is not defined.
event_name = self.event or "message"
return f"Event ({event_name}): {self.data}"
def get_stream(question, chat_history, project, url):
headers = {"Content-Type": "text/event-stream", "Accept": "text/event-stream"}
request_data = {"question": question, "chat_history": chat_history, "project": project}
response = requests.post(url, json=request_data, headers=headers, stream=True)
response.raise_for_status()
content_type = response.headers.get("Content-Type")
if "text/event-stream" in content_type:
event_stream = EventStream(response.iter_lines())
for event in event_stream:
data = event.data
data = json.loads(data)
yield data["answer"]
else:
raise TypeError(f"{content_type=} is not text/event-stream")
if __name__ == "__main__":
question = "tell about"
chat_history = []
url = "http://localhost:8080/score"
project = "hr"
st = time()
for line in get_stream(question, chat_history, project, url):
print(line, end="")
print()
print(time() - st)
@ArtyomZemlyak Could you share how you start the promptflow server
?
@wangchao1230
Using Docker server:
CMD ["bash", "./start.sh"]
# Startup script for the flow container
CONDA_ENV_PATH="$(conda info --base)/envs/promptflow-serve"
export PATH="$CONDA_ENV_PATH/bin:$PATH"
cd ./kb-search-tool
pip install -e .
cd ..
cd ./os-llm-tool
pip install -e .
cd ..
ls
ls connections
pf connection create --file /connections/kb_search_gpu2.yaml
pf connection create --file /connections/gguf_openai.yaml
pf flow serve --source flow --host 0.0.0.0
Hi @ArtyomZemlyak
Thank you for reporting this issue to us. We appreciate your feedback and we are sorry for any inconvenience this may have caused you.
From your description, I understand that you are using a docker container to host the promptflow server and send requests to it. The server returns a streaming output to the client. However, when you send multiple requests at the same time, one of the terminals crashes with an error message.
You have identified two conditions that trigger this bug:
To investigate your case, I attempted to reproduce the bug on my end using the following setup:
However, I did not encounter any errors when multiple requests were made to the container simultaneously. This was the case even when a lengthy passage was input for the flow to echo, which would extend the duration of the streaming process.
In our Docker setup, we use the following settings for the Flask app, which ensures that multiple requests are processed by different workers in case of a race condition:
gunicorn -w 8 --threads 1 -b "0.0.0.0:8080" --timeout 300 "promptflow._sdk._serving.app:create_app()"
From my perspective, it seems that each request is isolated and processed in a different process. I do not see any difference with a single request processing.
This leads me to believe that there might be some discrepancies in our understanding of the problem at hand. It would be greatly beneficial if you could provide us with your flow or docker file. This will allow us to fully grasp the nature of the problem you are encountering.
@liucheng-ms
Thanks for your reply! I tried gunicorn -w 8 --threads 1 -b "0.0.0.0:8080" --timeout 300 "promptflow._sdk._serving.app:create_app()"
and also hase exactly same issue :(
docker-compose.yml
version: '3'
services:
hr_chat_bot:
build:
context: .
dockerfile: Dockerfile
env_file:
- .env
ports:
- 8080:8080
volumes:
- ./:/flow
- ./connections:/connections
- ../kb-search-tool:/kb-search-tool
- ../os-llm-tool:/os-llm-tool
- ./start.sh:/start.sh
command: ["bash", "./start.sh"]
image: hr_chat_bot
restart: unless-stopped
networks:
- llm-test_default
networks:
llm-test_default:
# driver: bridge
external: true
# knowledge-base-search_kbs:
# external: true
Docker
# syntax=docker/dockerfile:1
FROM docker.io/continuumio/miniconda3:latest
WORKDIR /
COPY ./requirements.txt /flow/requirements.txt
# create conda environment
RUN conda create -n promptflow-serve python=3.9.16 pip=23.0.1 -q -y && \
conda run -n promptflow-serve \
pip install -r /flow/requirements.txt && \
conda run -n promptflow-serve pip install keyrings.alt && \
conda run -n promptflow-serve pip cache purge && \
conda clean -a -y
# COPY ./connections/* /connections/
# COPY ./start.sh /
CMD ["bash", "./start.sh"]
Also tried 10 threads. New traceback from server:
[2023-11-09 11:59:10 +0000] [86] [ERROR] Socket error processing request.
Traceback (most recent call last):
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 710, in _error_catcher
yield
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 1073, in read_chunked
self._update_chunk_length()
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/requests/models.py", line 816, in generate
yield from self.raw.stream(chunk_size, decode_content=True)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 933, in stream
yield from self.read_chunked(amt, decode_content=decode_content)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 1102, in read_chunked
self._original_response.close()
File "/opt/conda/envs/promptflow-serve/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/urllib3/response.py", line 727, in _error_catcher
raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/gunicorn/workers/gthread.py", line 282, in handle
keepalive = self.handle_request(req, conn)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/gunicorn/workers/gthread.py", line 354, in handle_request
util.reraise(*sys.exc_info())
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/gunicorn/util.py", line 641, in reraise
raise value
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/gunicorn/workers/gthread.py", line 339, in handle_request
for item in respiter:
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/werkzeug/wsgi.py", line 256, in __next__
return self._next()
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/werkzeug/wrappers/response.py", line 32, in _iter_encoded
for item in iterable:
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_sdk/_serving/response_creator.py", line 79, in generate
for chunk in self.stream_iterator:
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 27, in generate_from_proxy
yield from proxy
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 17, in __next__
item = next(self._generator)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 27, in generate_from_proxy
yield from proxy
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 17, in __next__
item = next(self._generator)
File "/os-llm-tool/os_llm_tool/tools/os_llm.py", line 99, in generator
for chunk in response:
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 27, in generate_from_proxy
yield from proxy
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/promptflow/_core/generator_proxy.py", line 17, in __next__
item = next(self._generator)
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/openai/api_resources/abstract/engine_api_resource.py", line 166, in <genexpr>
return (
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/openai/api_requestor.py", line 692, in <genexpr>
return (
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/openai/api_requestor.py", line 115, in parse_stream
for line in rbody:
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/requests/models.py", line 865, in iter_lines
for chunk in self.iter_content(
File "/opt/conda/envs/promptflow-serve/lib/python3.9/site-packages/requests/models.py", line 818, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
@liucheng-ms I think, issue can be with llama-cpp-python server. Because I tried same (2 terminals) with stream responses directly to llama-cpp-python server (OpenAI API compatible). And same issue occured!
I think we can close this issue, because its not from promptflow
@ArtyomZemlyak Thank you for your investigation and feedback. I appreciate your effort and collaboration. I agree that the issue seems to be related to the llama-cpp-python server, and not promptflow. I will close this issue Thank you for your cooperation and understanding. 😊
Describe the bug
Promptflow server
cant handle multiple requests with streaming output.How To Reproduce the bug
promptflow server
with stream chat outputpromptflow server
promptflow server
("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read)
Expected behavior On 5 step first and second terminals worked in streaming format and get results from
promptflow server
Logs 1 terminal
Logs promptflow server