marshmallow-code / flask-smorest

DB agnostic framework to build auto-documented REST APIs with Flask and marshmallow
https://flask-smorest.readthedocs.io
MIT License
656 stars 73 forks source link

Issues in Streaming response from flask-smorest #664

Open FahdCodes opened 3 months ago

FahdCodes commented 3 months ago

I'm encountering issues in streaming response in flask-smorest. I'm following the guidance here - https://flask.palletsprojects.com/en/2.3.x/patterns/streaming/ for streaming responses from my flask-smorest application. Below is the MRE version of my code. Say my application is fetching foreign exchange rates for the past 1000 days for any currency requested by the end user.

This is the version without using streaming. It works perfectly and returns a list of json responses-

from flask import request, Response
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from marshmallow import Schema, fields
import asyncio

class CurrencySchema(Schema):
    name = fields.Str()
    rate = fields.Str()
    date = fields.Str()
    source = fields.Str()

blp = Blueprint("test",__name__, description="test")

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(currency))      #returns a list of dictionaries
        return results

When I run this, it successfully runs and returns a list of json responses on my browser, like-

[{'name': 'USD', 'rate': '1.2333', 'date': 'Mar 21, 2024', 'source': 'currency.com'}, 
 {'name': 'USD', 'rate': '1.2121', 'date': 'Mar 22, 2024', 'source': 'currency.com'}, .................so on and so forth up to 1000 jsons]

Now, comes the part when I try streaming the responses. I make the below changes to my code-

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(currency))      #returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return generate_rates()

This strangely returns a list of 50 empty json responses- [{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}]

I also tried this, but with the same result i.e. list of empty json responses, but additionally flask-smorest gave me a "AssertionError: applications must write bytes". Seems like the werkzeug serving.py file was throwing issues.

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(currency))      #returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return Response(generate_rates(), mimetype = 'application/json')

My entire application is ready and this is the last bit that is giving issues. I want to stream the responses, and there is something in flask-smorest that is causing the issue. Would really appreciate your support. Thanks!

lafrech commented 2 months ago

Interesting. I've never achieved streaming with Flask but this is something I'd be happy to support.

I can't tell out of my head why it doesn't work. I guess the response decorator is not happy about receiving a generator. That would explain the first case. Not sure about the second, though.

I don't know when I'll have time to investigate this. If someone wants to give it a go, I suggest looking in Flask code to see how views returning generators are treated and adapt the response decorator code here.

FahdCodes commented 2 months ago

So I was finally able to resolve this issue. The root cause of the issue was the @blp.response decorator that was not accepting generator responses. Finally ended up removing the decorator from my code and manually serializing the results. Here's the final code-

@blp.route("/test")
class Test(MethodView):
    #@blp.response(200, CurrencySchema(many=True))           #removing the decorator
    def get(self):
        currency = request.args.get('currency')
        schema = CurrencySchema()

        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries 

        @stream_with_context
        def generate_results():
            yield '['
            for result in results[:-1]:
                yield schema.dumps(result)
                yield ', '
            yield schema.dumps(results[-1])
            yield ']'

        return Response(generate_results(), mimetype='application/json') 

It would be interesting to see if the blp.response decorator can handle generators.