streamlit / streamlit

Streamlit ā€” A faster way to build and share data apps.
https://streamlit.io
Apache License 2.0
35.54k stars 3.08k forks source link

Code snippet to upload files to disk in chunks rather than in memory #9460

Open codeSamuraii opened 1 month ago

codeSamuraii commented 1 month ago

Checklist

Summary

Hello,

Currently, Streamlit stores the entirety of uploaded files in memory. This limits upload size to available RAM of the machine.

I managed to implement chunked multipart uploads straight to disk, allowing for arbitrary upload sizes. However my knowledge of Streamlit's inner workings is quite limited and I can't find an elegant solution worthy of a PR. Hence I wanted to share with you parts of my solution, in the hope of someone picking it up and implementing it properly.


I started by subclassing UploadFileRequestHandler (streamlit/web/server/upload_file_request_handler.py), which handles the file upload:

  1. By decorating it with @tornado.web.stream_request_body, we can can define a data_received(self, chunk) method to receive chunked data.
  2. Using the package python-multipart, we can create a parser in the prepare() method, and feed it data in the previously mentionned data_received(self, chunk).
  3. python-multipart then gives us access to a File instance through an on_file() method that we define on the class.

This is where it got very hacky and custom for me, modifying UploadedFile, UploadedFileRec etc., and where I believe an expert view would be welcomed.

Here is the full code:

@tornado.web.stream_request_body
class StreamingUploadRequestHandler(UploadFileRequestHandler):
    """Implements the POST /upload_file endpoint but streams the body in chunks directly to a file to prevent memory issues."""

    def prepare(self):
        self.session_id = self.path_kwargs['session_id']
        self.file_id = self.path_kwargs['file_id']
        self.bytes_received = 0
        self.parser = None

        if self.request.method == 'DELETE':
            return

        self.parser = multipart.create_form_parser(
            self.request.headers, self.on_field, self.on_file, trust_x_headers=True,
            config=dict(MAX_MEMORY_FILE_SIZE=0, UPLOAD_DELETE_TMP=False)
        )

    def on_field(self, field: multipart.Field):
        _LOGGER.info('Field received: %s = %s', field.field_name, repr(field.value))

    def on_file(self, file: multipart.File):
        if file.in_memory:
            file.flush_to_disk()
        self.file_name = file.file_name.decode()
        self.file_size = file.size
        self.file_path = file.actual_file_name.decode()
        file.finalize()
        _LOGGER.info('File received: %s (size: %.2f MiB)', self.file_name, self.file_size/(1024**2))

    def data_received(self, chunk: bytes):
        if self.parser is not None:
            self.parser.write(chunk)
        self.bytes_received += len(chunk)

    def on_finish(self) -> None:
        if self.parser is not None:
            self.parser.close()
        _LOGGER.info('Request finished: %.2f MiB received', self.bytes_received/(1024**2))

    def put(self, **kwargs):
        """Receive an uploaded file and add it to our UploadedFileManager."""
        try:
            if not self._is_active_session(self.session_id):
                raise Exception("Invalid session_id")
        except Exception as e:
            self.send_error(400, reason=str(e))
            return

        # UploadedFileToDiskRec is my hacky subclass of UploadFileRec that accepts a path to present the user
        uploaded_files: list[UploadedFileToDiskRec] = []
        uploaded_files.append(UploadedFileToDiskRec(
            file_id=self.file_id,
            name=self.file_name,
            type=self.request.headers["Content-Type"],
            path=self.file_path,
            size=self.file_size
        ))

        if len(uploaded_files) != 1:
            self.send_error(
                400, reason=f"Expected 1 file, but got {len(uploaded_files)}"
            )
            return

        self.parser.finalize()
        self._file_mgr.add_file(session_id=self.session_id, file=uploaded_files[0])
        self.set_status(204)
        _LOGGER.info('File added to file manager')

    def delete(self, **kwargs):
        """Delete file request handler."""
        session_id = self.path_kwargs["session_id"]
        file_id = self.path_kwargs["file_id"]

        self._file_mgr.remove_file(session_id=session_id, file_id=file_id)
        self.set_status(204)
        _LOGGER.info('File removed from file manager')

Adding a to_disk keyw. arg. to st.file_uploader to upload files to a new endpoint would be ideal, however I have very limited front-end experience and Streamlit's widget rendering is obscure to me.

I hope this can help someone to implement this feature properly.

Why?

No response

How?

No response

Additional Context

No response

github-actions[bot] commented 1 month ago

To help Streamlit prioritize this feature, react with a šŸ‘ (thumbs up emoji) to the initial post.

Your vote helps us identify which enhancements matter most to our users.

Visits

jrieke commented 1 month ago

Hey @codeSamuraii! Thanks for the awesome investigation, this is great! We thought about uploading files to disk a few times in the past but it was never on the top of our priority list. But I believe at some point we'll do it, especially if this gets more upvotes!