geertj / gruvi

Async IO for Python, Simplified
http://gruvi.readthedocs.org/
MIT License
94 stars 12 forks source link

TransportError "already reading" with http over ssl #20

Closed mschmitzer closed 7 years ago

mschmitzer commented 9 years ago

Using HTTP over SSL seems to run into the problems described in the docstring of SslTransport.pause_reading():

Flow control for SSL is a little bit more complicated than for a regular :class:Transport because SSL handshakes can occur at any time during a connection. These handshakes require reading to be enabled, even if the application called :meth:pause_reading before.

The effect we're seeing is the TransportError: alreay reading raised from Transport.resume_reading().

The fix we've come up with is to add self._reading = True in the _update_body_size and data_received methods of HttpProtocol. This somewhat, but not quite matches what's described in the pause_reading docstring:

The consequence is that if you are implementing your own protocol and you want to support SSL, then your protocol should be able to handle a callback even if it called :meth:pause_reading before. The recommended way to do this is to store a flag "currently reading" and set it when you call :meth:resume_reading and when :meth:Protocol.data_received is called by the transport. Based on this flag you can prevent calling :meth:resume_reading when you are already reading due to a handshake.

Is this the correct solution?

geertj commented 9 years ago

Hi!

I think that would work, although I think if you set that in the first line of data_received() that is sufficient?

I believe the fundamental issue is that PEP3156 was not written with SSL in mind. In the case of SSL, the transport may need to enable reading, without the protocol having called resume_reading(). I tried to fix this issue within the boundaries of PEP3156, but I think this proves that the result is too complicated. Even a protocol provided by Gruvi itself is not working properly.

I think the "proper" fix is to allow resume_reading() and pause_reading() to be called always. If the transport is already in the desired state, they simply become a no-op. That also simplifies protocol design, as it doesn't have to keep track of whether reading was previously enabled or not. And this API is consistent with e.g. file.close() that you are also allowed to call multiple times.

Therefore I am thinking of making the following change. Can you check to see if it solves your issue. Note that there are 3 test errors with this fix that are all expected (they check for the original behavior):

diff --git a/lib/gruvi/protocols.py b/lib/gruvi/protocols.py
index e0dab7e..38eb77f 100644
--- a/lib/gruvi/protocols.py
+++ b/lib/gruvi/protocols.py
@@ -41,12 +41,10 @@ class BaseProtocol(object):
         self._may_write = Event()
         self._may_write.set()
         self._closed = Event()
-        self._reading = False

     def connection_made(self, transport):
         """Called when a connection is made."""
         self._transport = transport
-        self._reading = True

     def connection_lost(self, exc):
         """Called when a connection is lost."""
@@ -87,12 +85,10 @@ class BaseProtocol(object):
         if self._transport is None:
             return
         bufsize = self.get_read_buffer_size()
-        if bufsize >= self._read_buffer_high and self._reading:
+        if bufsize >= self._read_buffer_high:
             self._transport.pause_reading()
-            self._reading = False
-        elif bufsize <= self._read_buffer_low and not self._reading:
+        elif bufsize <= self._read_buffer_low:
             self._transport.resume_reading()
-            self._reading = True

 class Protocol(BaseProtocol):
diff --git a/lib/gruvi/stream.py b/lib/gruvi/stream.py
index 9b499a7..c6766d2 100644
--- a/lib/gruvi/stream.py
+++ b/lib/gruvi/stream.py
@@ -350,7 +350,6 @@ class StreamProtocol(Protocol):

     def data_received(self, data):
         # Protocol callback
-        assert self._reading is True
         self._reader.feed(data)

     def eof_received(self):
diff --git a/lib/gruvi/transports.py b/lib/gruvi/transports.py
index de626fe..973512e 100644
--- a/lib/gruvi/transports.py
+++ b/lib/gruvi/transports.py
@@ -185,7 +185,13 @@ class Transport(BaseTransport):
             self._protocol.data_received(data)

     def pause_reading(self):
-        """Pause calling callbacks on the protocol."""
+        """Pause calling callbacks on the protocol.
+
+        This method may be called even if reading was already paused. This
+        simplifies protocol design, especially in combination with SSL where
+        reading may need to be enabled by the transport itself without
+        knowledge of the protocol.
+        """
         # Note: pause_reading() and resume_reading() are allowed when _closing
         # is true (unlike e.g. write()). This makes it easier for our child
         # class SslTransport to enable reading when it is closing down.
@@ -195,23 +201,25 @@ class Transport(BaseTransport):
             raise compat.saved_exc(self._error)
         elif self._protocol is None:
             raise TransportError('transport not started')
-        elif not self._reading:
-            raise TransportError('not currently reading')
-        self._handle.stop_read()
-        self._reading = False
+        elif self._reading:
+            self._handle.stop_read()
+            self._reading = False

     def resume_reading(self):
-        """Resume calling callbacks on the protocol."""
+        """Resume calling callbacks on the protocol.
+
+        This method may be called even if reading was already resumed. See the
+        note in :meth:`pause_reading`.
+        """
         if not self._readable:
             raise TransportError('transport is not readable')
         elif self._error:
             raise compat.saved_exc(self._error)
         elif self._protocol is None:
             raise TransportError('transport not started')
-        elif self._reading:
-            raise TransportError('already reading')
-        self._handle.start_read(self._read_callback)
-        self._reading = True
+        elif not self._reading:
+            self._handle.start_read(self._read_callback)
+            self._reading = True

     def _on_write_complete(self, datalen, handle, error):
         # Callback used with handle.write().
mschmitzer commented 9 years ago

Thanks for looking into this.

I've done some basic testing on the above patch, and it appears to work and fix the issue. It's hard to be sure, though, because the issue is difficult to reproduce.

geertj commented 7 years ago

In recent Gruvi versions the behavior of Transport.pause_reading() and Transport.resume_reading has changed so that it's safe to call them multiple times.

Closing this issue.