giampaolo / pyftpdlib

Extremely fast and scalable Python FTP server library
MIT License
1.68k stars 262 forks source link

Streaming result from a `requests.get` response #473

Open giulioprovasi opened 6 years ago

giulioprovasi commented 6 years ago

I am implementing a fully virtual ftp server using pyftpdlib, and I am stuck on the ftp_RETR command.

What I need to to is to stream the content of a remote file (retrieved with requests.get() or any other http client) directly to the user without storing the file on the ftp server, is this possible ?

Here what I did:

# --- ftp handler
def ftp_RETR(self, file):
    try:
        r = requests.get('http://mirror.filearena.net/pub/speed/SpeedTest_2048MB.dat', stream=True)
        producer = RemoteProducer(r, self._current_type)
        self.push_dtp_data(producer, isproducer=True, file=r, cmd="RETR")
        return file
    except Exception:
        fd.close()
        raise

# --- producer

class RemoteProducer(object):
    """Producer wrapper for file[-like] objects."""

    buffer_size = 65536

    def __init__(self, r, type):
        """Initialize the producer with a data_wrapper appropriate to TYPE.

         - (file) file: the file[-like] object.
         - (str) type: the current TYPE, 'a' (ASCII) or 'i' (binary).
        """
        self.r = r
        self.type = type
        if type == 'a' and os.linesep != '\r\n':
            self._data_wrapper = lambda x: x.replace(b(os.linesep), b'\r\n')
        else:
            self._data_wrapper = None

    def more(self):
        """Attempt a chunk of data of size self.buffer_size."""
        try:
            data = self.r.iter_content(chunk_size=self.buffer_size)
        except OSError as err:
            raise _RemoteReadWriteError(err)
        else:
            if self._data_wrapper is not None:
                data = self._data_wrapper(data)
            return data

And, moreover, is it possible to make this non-blocking ? to allow the user to continue it's navigation as we serve the file ?

Thanks for your input

giulioprovasi commented 6 years ago

I found out that doing this works (kindof):

    import urllib.request as req

    def open(self, filename, mode):
        """Open a file returning its handler."""
        assert isinstance(filename, unicode), filename
        x = req.urlopen('http://mirror.filearena.net/pub/speed/SpeedTest_2048MB.dat')
        x.name = filename
        return x

# --- main.py
handler.send_file = False

but got two issues here: 1) I'd rather use requests if possible, do you think a big code change is needed ? 2) This is blocking, if I send a command to the ftp server while downloading a big file, the ftp client just quit

and one further question: 1) Disabling send_file makes slow down transfer rates and lower performances, but this is needed in order to stream a remote url isn't it ?

giampaolo commented 6 years ago

Hello. Yes, that makes the FTP server blocking. As of now the only thing you can do is changing the concurrency model, that is using pyftpdlib.servers.ThreadedFTPServer: https://pyftpdlib.readthedocs.io/en/latest/tutorial.html#changing-the-concurrency-model

giampaolo commented 6 years ago

Disabling send_file makes slow down transfer rates and lower performances, but this is needed in order to stream a remote url isn't it ?

Correct. You can only use sendfile(2) to transmit regular files.

giulioprovasi commented 6 years ago

Ok, I managed to use urllib (not requests) as their response "implements" all the BufferedStream methods:

import urllib.request as req

def open(self, filename, mode):
    """Open a file returning its handler."""
    assert isinstance(filename, unicode), filename
    return req.urlopen('http://127.0.0.1/app/favicon.ico')

Which works with both send_file and base ftp_RETR function. What I'd like to get precisions on is the 'blocking' download, let's assume I am running a simple ftp server with pyftpdlib (the sample one for instance), you mean that any download makes the UI freeze ? The user (others too?) cannot navigate anymore until the download is over ? If so, what is the point of using Async model instead of Threaded ? I don't really see the point

Thanks for your time :)

giampaolo commented 6 years ago

The ftp server blocks as long as any single call blocks. E.g. instantiating a http request in ftp_RETR is a blocking operation. When you read a chunk from the request instance that is also a blocking operation. You are reading in chunks though, so that means you will not block for the whole file transfer: you will block only while reading a chunk.

The gist here is that when you are dealing with a regular fs the access to the fs is usually fast enough: the server will block for short periods of time, so basically that does not represent a problem so you want to stick with the pure async model. And you do because it’s considerably more fast and scalable.

When you have long blocking operations the only thing you can do is change the concurrency model. ThreadedFTPServer class will basically run a separate IO loop per connection so you are free to block without affecting the whole server. The downside is that you lose in scalability but that probably won’t make any difference unless you handle very heavy loads.

Theoretically if your http operations are fast enough you may avoid using threads but it’s unlikely and I would not recommend it either way.

An alternative to the threaded model is staying async and pre-fork a certain number of processes (say 10) on startup, before starting the server. That means you are free to have up to 10 simultaneous blocking operations before the server stops responding. This is the approach used by Tornado and what inspired this idea and probably represents the best compromise. I did that in a git branch but still didn’t have time to merge it and publish a release, basically because doc changes are still missing (and as you can see explaining these concepts is not exactly straightforward :)).

On Wed, 29 Aug 2018 at 07:12, Giulio Provasi notifications@github.com wrote:

Ok, I managed to use urllib (not requests) as their response "implements" all the BufferedStream methods:

import urllib.request as req

def open(self, filename, mode): """Open a file returning its handler.""" assert isinstance(filename, unicode), filename return req.urlopen('http://127.0.0.1/app/favicon.ico')

Which works with both send_file and base ftp_RETR function. What I'd like to get precisions on is the 'blocking' download, let's assume I am running a simple ftp server with pyftpdlib (the sample one for instance), you mean that any download makes the UI freeze ? The user (others too?) cannot navigate anymore until the download is over ? If so, what is the point of using Async model instead of Threaded ? I don't really see the point

Thanks for your time :)

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/giampaolo/pyftpdlib/issues/473#issuecomment-416826532, or mute the thread https://github.com/notifications/unsubscribe-auth/AAplLNOuKrPL21WsIqTfT1PYUJjehc8pks5uViLUgaJpZM4WO-b3 .

-- Giampaolo - http://grodola.blogspot.com

giulioprovasi commented 6 years ago

Ok I see, I will take a look at the branch in question. The API calls are on a dedicated network which will have no lag/firewalls, so it shouldn't be an issue, but I'm keeping in mind that I should test that thoroughly.

Btw, another quick question if I can without opening a new issue, while RETR works great, can you point me to the right direction for implementing the STOR/STOU ?

Basically, the REST API have a route that can provide me an upload URL, so ideally I should stream the data to a remote http request, is this feasible in your opinion ?

Cheers and keep up with this great projet ;)

giampaolo commented 6 years ago

Ok I see, I will take a look at the branch in question. The API calls are on a dedicated network which will have no lag/firewalls, so it shouldn't be an issue, but I'm keeping in mind that I should test that thoroughly.

Yep. You should time each request and see how long it takes to return. I would consider (at least) anything >= 0.1 secs as potentially blocking. Keep in mind that it all depends from the FTP server load. E.g. if the FTP server is handling 100 transfers at the same time, 0.1 100 means that a client may be stuck for 10 secs in the worst case, which is bad. By using the pre-fork approach with 10 processes that would be reduced to 1 sec (`100 0.1 / 10 = 1`) which I consider acceptable.

Btw, another quick question if I can without opening a new issue, while RETR works great, can you point me to the right direction for implementing the STOR/STOU ? Basically, the REST API have a route that can provide me an upload URL, so ideally I should stream the data to a remote http request, is this feasible in your opinion ?

Yes, I would say that sounds reasonable. You need to provide a file-like object to return on open() whose write() method writes chunk of data on the request. It is not too dissimilar from what you already did for RETR. Pseudo code:


class StorFileWrapper:

    def __init__(self, request):
        self.request = request

    def write(self, chunk):
        self.request.write(chunk)

    def close(self):
        self.request.close()

class VirtualFs:

    def open(self, path, mode):
        if 'w' or 'a' in mode:
            # STOR / STOU / APPE
            req = get_write_file_request(path, mode)
            return StorFileWrapper(req) 
        else:
            # RETR
            return get_read_file_request(path)
giampaolo commented 6 years ago

Also some other things to keep in mind:

Hope this helps.

giampaolo commented 6 years ago

If you implement this it would be great if you could share your code (or an extract). It would be nice to put it in the doc or something.

giulioprovasi commented 6 years ago

Thank you for the input, that lead me to the right direction, I will indeed post some code once that works ;)

giulioprovasi commented 6 years ago

@giampaolo here is how I've implemented the upload/download directly to the remote server storing the final file, I am not completely 100% sure about the "wait for chunks from client" part (see comment in code) but upload works this way.

The only requirement is that the web server should support Chunked transfer encoding in order to keep the HTTP connection until all data has been transfered.

Please feel free to give me a feedback, I'd be glad to improve this

class VirtualFs(AbstractedFS):
    ...
    def open(self, filename, mode):
        """Open a file returning its handler."""
        assert isinstance(filename, unicode), filename
        # In our case, fs.open() can do 2 things:
        #   - download a remote file: STOR/STOU (w)
        #   - upload a file to a remote url: RETR (r)
        # so depending on the provided file mode, start either
        # file download or file upload, both should return
        # a valid file-[like] object
        if 'w' in mode:
            # STOR / STOU
            claims = _api(self.api.stor, filename)
            fd = StorFileWrapper(filename=filename, **claims)
        else:
            # RETR
            url = _api(self.api.retr, filename)
            try:
                # urllib urlopen returns a file-like object which
                # have all needed methods except for seek(), as we
                # don't allow APPE and REST, this does not matter
                fd = req.urlopen(url)
            except HTTPError:
                raise NotFoundError('Could not open %s : file not found.' % self.ftp2fs(filename))
        return fd

class StorFileWrapper:

    def __init__(self, filename, upload_url, progress_url=None, max_size=-1, valid_until=None):
        self.filename = filename
        self.upload_url = upload_url
        self.progress_url = progress_url
        self.max_size = max_size
        self.valid_until = valid_until
        self.upload_started = False
        self.upload_finished = False
        self.next_chunk = None

    def read_chunks(self):
        while True:
            chunk = self.next_chunk

            # not sure about this, but if there's a delay on the socket
            # serving the upload file, we must hold the HTTP request
            # so we need to make a distinction between
            #   - no more data
            #   - awaiting for more data from the client
            self.next_chunk = None
            if not chunk:
                continue

            # once the upload finishes, halt the generator
            if self.upload_finished:
                break

            yield chunk

    def write(self, chunk):

        if not chunk:
            self.upload_finished = True
            return

        self.next_chunk = chunk

        if not self.upload_started:
            self.upload_started = True
            requests.put(self.upload_url, data=self.read_chunks(),
                         headers={
                             'Content-Type': 'application/octet-stream',
                             'X-Title': self.filename,
                         })

And here is the PHP server retrieving the upload:

public function getPutFile($path) {
        $fpTmp = fopen($path, 'wb');
        $fpInput = fopen('php://input', 'rb');

        while (!feof($fpInput)) {
            // @todo: chunks size should be configurable
            fwrite($fpTmp, fread($fpInput, 65535));
        }
        fclose($fpInput);
        fclose($fpTmp);

        // more logic in order to get the POST body and simulate a PHP uploaded file with $_FILES
}

_api() is just a wrapper that does custom error handling after calling the REST API.

giampaolo commented 6 years ago

Looks good to me in principle. Note that if mode == "a" (APPE command) your code will erroneously assume RETR.

giulioprovasi commented 6 years ago

indeed fixed, I had it removed as we don't support APPE