harvimt / quamash

Implementation of the PEP 3156 event-loop (asyncio) api using the Qt Event-Loop
BSD 2-Clause "Simplified" License
264 stars 46 forks source link

Could not keep auto reconnect to a closed tcp server while ConnectionRefusedError raised #95

Open perillaseed opened 6 years ago

perillaseed commented 6 years ago

While using quamash and tcp client connect to a closed tcp server and hope to try to reconnect to this tcp server automatically, it will raised a ConnectionRefusedError and stop to reconnect it again, use asyncio.get_event_loop() has no such problem, it will keep reconnect until the tcp server open. This issus also ask in https://stackoverflow.com/questions/51093428/different-behaviours-when-using-qeventloop-and-asyncio-default-loop-with-python3, and the simplized test code can be found as follows:

import sys
import time
import asyncio
import logging
from quamash import QEventLoop
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QThread, pyqtSignal, QRunnable, QDataStream, pyqtSlot
from PyQt5.QtNetwork import QTcpSocket

logger = logging.getLogger(__name__)

from PyQt5.QtWidgets import QMainWindow, QTextBrowser
class MainWindow(QMainWindow):
    sigMsgReceived = pyqtSignal(str)
    def __init__(self, vhost, vport):
        QMainWindow.__init__(self)
        self.vhost = vhost 
        self.vport = vport
        self.centralwidget = QTextBrowser(self)
        self.setCentralWidget(self.centralwidget)
        self.centralwidget.append("Starting...")
        self.sigMsgReceived.connect(self.centralwidget.append)

    async def update(self):
        vis_loop = asyncio.get_event_loop()
        while True:
            try:
                self.vreader, self.vwriter = await asyncio.open_connection(host=self.vhost, port=self.vport, loop=vis_loop)
                async for msg in self.vreader:
                    self.sigMsgReceived.emit(msg.decode())
            except Exception as e:
                logger.exception(e)
                print("Try to connect after 1 seconds...")
                await asyncio.sleep(1) 

if __name__ == '__main__':
    vhost = "127.0.0.1"
    vport = 8848

    logger.setLevel(logging.DEBUG)

    app = QApplication(sys.argv)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)  # NEW must set the event loop
    with loop:
        mw = MainWindow(vhost, vport)
        mw.show()
        asyncio.run_coroutine_threadsafe(mw.update(), loop)
        loop.run_forever()

and here is the exception raised:

Event callback failed
Traceback (most recent call last):
  File "D:\Zibo\Documents\SRC\workspace\QHub\venv\lib\site-packages\quamash\_windows.py", line 42, in _process_events
    value = callback(transferred, key, ov)
  File "D:\Python35\Lib\asyncio\windows_events.py", line 509, in finish_connect
    ov.getresult()
ConnectionRefusedError: [WinError 1225]
harvimt commented 6 years ago

On Windows, Quamash doesn't extend the default event loop (selector), instead it extends the Proactor event loop.

(this is because I find the Proactor more useful, since it supports subprocesses...)

Though this might be a problem with quamash itself since the error is bubbling through quamash/_windows.py, in order to really reproduce you'd need to see if this works with a non-quamash proactor event loop.

russellwinstead commented 6 years ago

This seems like the same issue as #91. It appears to be related to the implementation of the _process_events method of the _ProactorEventLoop. By modifying the OSError except clause to set the exception on f instead of logging, I believe you can achieve the desired result.

    def _process_events(self, events):
        """Process events from proactor."""
        for f, callback, transferred, key, ov in events:
                        try:
                                self._logger.debug('Invoking event callback {}'.format(callback))
                                value = callback(transferred, key, ov)
-                       except OSError:
-                               self._logger.warning('Event callback failed', exc_info=sys.exc_info())
+                       except OSError as e:
+                               f.set_exception(e)
                        else:
                                f.set_result(value)

Setting the exception on f is what the standard library asyncio implementation of the IocpProactor does.