siddhantgoel / streaming-form-data

Streaming (and fast!) parser for multipart/form-data written in Cython
https://streaming-form-data.readthedocs.io/en/latest/
MIT License
169 stars 33 forks source link

Parser won't get registered and receive chunk data as a flask_appbuilder app in airflow webserver ui #56

Closed galaxie500 closed 2 years ago

galaxie500 commented 3 years ago

Hi,

I integrated the flask example of upload-test.py to airflow webserver UI as a uploading plugin, it allows user to upload a csv file within airflow webserver UI and save the file to server directory(''/usr/local/airflow/uploads/'), however the parser fails to get registered with any header information and the chunked data won't be written to file through the parser.

  1. I have verified that the upload-test.py worked well on local flask host with @app.route.
  2. I have verified the airflow plugin interface below functioned well when I using request.files and .save(path_to_save).

Here is the flask app(@expose) under airflow plugin:

class PipelineLauncher(AppBuilderBaseView): # from flask_appbuilder import BaseView as AppBuilderBaseView
    @expose('/', methods=('GET', 'POST'))
    def list(self):
        if request.method == 'POST':
            path_to_save = '/usr/local/airflow/uploads/temp.csv'   #path mounted with airflow

            file_ = FileTarget(path_to_save)
            parser = StreamingFormDataParser(headers=request.headers)
            parser.register('file', file_)

            while True:
                chunk = request.stream.read(8192)
                if not chunk:
                    break
                parser.data_received(chunk)

            #df = pd.read_csv(path_to_save) this will throw error 'pandas.errors.EmptyDataError: No columns to parse from file'
            #rows = df.shape[0]

            return self.render_template("debug.html", 
                                     path_to_save=path_to_save,
                                     file_object=file_,
                                     header=request.headers,
                                     filename=file_.multipart_filename,
                                     content_type=file_.multipart_content_type)
        return self.render_template("index.html")

# debug.html
# path_to_save: {{ path_to_save }}
# file_object:  {{ file_object }}
# header: {{ header }}
# filename: {{ filename }}
# content_type: {{ content_type }}

bp = Blueprint(
    "pipeline", __name__,
    template_folder='templates',
    static_folder='static',
    static_url_path='/static/pipeline_launcher')

class AirflowCustomLauncher(AirflowPlugin):
    name = "pipeline"
    pipeline_launcher = PipelineLauncher()
    pipeline_launcher_package = {
        "name": "Manual Upload Plugin",
        "category": "Launch Pipeline",
        "view": pipeline_launcher
    }
    appbuilder_views = [pipeline_launcher_package]
    admin_views = [pipeline_launcher_package]
    flask_blueprints = [bp]

index.html

{% include "airflow/master.html" %}
{% block body %} 
<title>Upload XLS/XLSX/CSV files to InfluxDB.</title>
<form method="post" class="admin-form form-horizontal" enctype="multipart/form-data" role='form'>
  <div class="col-md-12 text-center">
    <h3>Manual Upload for InfluxDB</h3>
    <br/>
    <p> This plugin currently only supports .csv, .xls, and .xlsx files. Larger files and .xlsx files will take longer than usual to process. Upon submitting a file, you will be taken to a page to preview your file as well as configure upload parameters. </p>
  </div>
  {% if csrf_token %}
  <input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
  {% endif %}
  <div class="form-group"> --> 
      <!-- You can take parameters from the user using the form elements and pass them to backend -->
    <label class="col-md-4 control-label">File: </label>
    <div class="col-md-6">
      <input class="form-control" type="file" name="file" />
    </div>
  </div>
  <div class="col-md-offset-4 col-md-10 submit-row">
    <button type="submit" class="btn btn-primary">Process File</button>
  </div>
  <div class="container">
    {% for message in get_flashed_messages() %}
      <div class="alert alert-warning">
        {{ message }}
      </div>
    {% endfor %}
  </div>
</form>
{% endblock %} 

The plugin allows me to choose a file to upload, and after I selected a csv file, here is the output from debug.html page:

path_to_save: /usr/local/airflow/uploads/temp.csv

file_object: <streaming_form_data.targets.FileTarget object at 0x7f7be38fb550>

header: Host: localhost:8080 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8 Accept-Encoding: gzip, deflate Accept-Language: en-us Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryDSj0i1GXH4P0ITsx Origin: http://localhost:8080 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15 Connection: keep-alive Upgrade-Insecure-Requests: 1 Referer: http://localhost:8080/pipelinelauncher/ Content-Length: 328842 Cookie: session=.eJwlj0tuQyEMRffCOAP-tt9mngzYbVSaVMAbRd17iSqPru5Hxy9z6pD5aQ7lPuVmznszhwnQFAGJrGvFh4aOxbP1PiOpT14qVScgBUKELATYgFJRpSSQhKu3pFmEi5Kr6kVjzM0iWqLkss9SiEKstlllZIGguGvIO6wCxWyQHxnf_JDHMsca10arc-i5nl_y2ISsEZLDrLW6nGy2GPYxSAQBTrYgqrftvdT445yL1zVPvfcl413vfTv9WbnLlnvyZq4p4_99Z37_AH8MU-Q.YTuY0A.t-_l07dcNPe_RN6CWI_Pg5cZ3vo

filename: None

content_type: None

Any help would be appreciated. Thank you.

siddhantgoel commented 3 years ago

One thing to double check would be if parser.data_received(chunk) is sending a valid value to the parser. I haven't worked with Airflow before so I can't tell if that plugin does something with the request body before it reaches the handler?

galaxie500 commented 3 years ago

One thing to double check would be if parser.data_received(chunk) is sending a valid value to the parser. I haven't worked with Airflow before so I can't tell if that plugin does something with the request body before it reaches the handler?

Thanks for the reply!

First, everything worked fine if I use file = request.files['file'] and file.save() under this airflow plugin interface.

I double checked with a simple csv file appending each chunk to a list. It seems that chunk was sent to parser.data_received(chunk) as a invalid value. From upload-test.py I get a length of 3 list, however with the same chunk size, under airflow plugin interface, the list return nothing(not None).

Could you please explain more on what could be possible reasons causing this? As we see, output of request.headers is what we expected, the issue is from request.stream.read()?

siddhantgoel commented 3 years ago

Yeah, my assumption is that by the time you call request.stream.read() some other part of the code has already read the request body.

I'd suggest putting either a breakpoint inside the while True or printing out the chunk before it's sent to parser.data_received. If you don't see any output, that would be a sign that this assumption is correct.

galaxie500 commented 3 years ago

Then how come the request body has already been read since there's no other request method and this is the very first part of the plugin interface. Thanks for your patience.

siddhantgoel commented 3 years ago

I'm not sure if I can answer that question without knowing in detail what the plugin interface does. request.headers is a dictionary and is probably set once and can remain there. Request bodies can contain much more data compared to the headers, so it's possible that web server programs may not retain the body once another function reads it.

Basically if there's no more request body to be read by the time control reaches your function, then there must be some other function somewhere else in the call stack that's reading it.

You could try putting a debugger in the main entrypoint of your program/script and then trace the execution and see what code is reading request.stream.