Open pgjones opened 1 year ago
ha, please follow me :
import asyncio
from quart import Quart, make_response, stream_with_context
from datetime import datetime
app = Quart(__name__)
@app.route('/')
async def sub():
@stream_with_context
async def data_stream():
try:
while True:
print("Sending Data")
yield "data: {ok: true}\n\n".encode()
await asyncio.sleep(1)
finally:
print("Exited Gen")
response = await make_response(data_stream())
response.timeout = None
response.headers['Content-Type'] = 'text/event-stream'
return response
if __name__ == "__main__":
app.run()
This is still a problem.
For a generator which yields nothing because of an exception one has to add a try...finally return
statement like so:
try:
async for m in your_generator_function:
json = m.model_dump_json()
yield f"data: {json}\n\n"
finally:
print("exited")
return