erdewit / ib_insync

Python sync/async framework for Interactive Brokers API
BSD 2-Clause "Simplified" License
2.85k stars 774 forks source link

How do I auto reconnect on disconnect? #41

Closed devan0 closed 6 years ago

devan0 commented 6 years ago

I'm trying to call ib_insync.Client.connect(...) in a loop with a delay in wrapper.apiEnd and got an exception

RuntimeWarning: coroutine 'Client.connectAsync' was never awaited`

Is this the correct way to reconnect? Or do I need to del my client instance and create a new one?

erdewit commented 6 years ago

It looks like a harmless warning, not an exception.

Not sure why you're using wrapper.apiEnd, don't you mean apiError? In any case it would be better to schedule a call to connect instead of a direct call, i.e. use loop.call_later(...) or such.

This is the gist of what I'm doing:


    def connect(self, wait=True):
        f = asyncio.ensure_future(self.connectAsync())
        if wait:
            util.syncAwait(f)
        self.setConnected()

    async def connectAsync(self):
        """
        Connect to a running TWS/gateway application.
        """
        if self.ib.isConnected():
            return
        self.ib.client.apiError = None
        self._logger.info('Trying to connect...')
        while True:
            try:
                await self.ib.connectAsync(self.host, self.port, self.clientId)
                if self.ib.isConnected():
                    break
            except ConnectionRefusedError:
                self._logger.error('Connect failed')
                await asyncio.sleep(5)
        self.ib.client.apiError = self._onApiError

    def _onApiError(self, _errorMsg):
        """
        Reconnect after connection failure. By default the reconnect is
        postponed for half a minute, otherwise TWS can give back error 102
        ("Max number of tickers has been reached").
        """
        delaySecs = 30
        self._logger.info(f'Reconnecting in {delaySecs} seconds')
        asyncio.get_event_loop().call_later(delaySecs, self.connect)
devan0 commented 6 years ago

Thanks for the example code. I tried a simpler version like this and the below assertion is failing. Why do you have that assert?

def connect(self):
    self.client.connect(host, port, clientid)

def apierror(self, errormsg):
   asyncio.get_event_loop().call_later(1, self.connect)
handle: <TimerHandle when=1635932.871899575 IBBroker._connect()>
Traceback (most recent call last):
  File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "myib.py", line 56, in _connect
    timeout=5)
  File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/client.py", line 142, in connect
    util.syncAwait(self.connectAsync(host, port, clientId, timeout))
  File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/util.py", line 242, in syncAwait
    result = _syncAwaitAsyncio(future)
  File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/util.py", line 247, in _syncAwaitAsyncio
    assert asyncio.Task is asyncio.tasks._PyTask
AssertionError
devan0 commented 6 years ago

Here's an extremely stripped down version of what I'm trying to do, which isn't working. I'd like to use your Client.connect(), should that be possible?

import asyncio
from ib_insync.client import Client
from ibapi.wrapper import EWrapper

import logging
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger("test_reconnect")

class MyWrapper(EWrapper):
    def __init__(self):
        self.client = Client(self)
        self.client.apiError = self.apierror

    def connect(self):
        try:
            self.client.connect('127.0.0.1', 4002, 99)
        except ConnectionRefusedError:
            _logger.error("Unable to connect")
            pass

    def apierror(self, msg):
        _logger.warning(f"apierror: {msg}, waiting 5 second and trying to reconnect")
        asyncio.get_event_loop().call_later(5, self.connect)

if __name__ == "__main__":
    mywrapper = MyWrapper()                                                                                                                                                                                                                                                                                        
    mywrapper.connect()
    mywrapper.client.run()                                                                                                                                                                                                                                                                                     ```

This works fine if I have an ib gateway running, but if I shut it down here's what happens:

INFO:ibapi.wrapper:ANSWER nextValidId {'orderId': 1} INFO:ib_insync.client:API connection ready DEBUG:ibapi.client:140700380804824 isConn: 2 ERROR:ib_insync.client:Peer closed connection WARNING:test_reconnect:apierror: Peer closed connection, waiting 5 second and trying to reconnect DEBUG:ibapi.client:140700380804824 connState: None -> 0 ERROR:asyncio:Exception in callback MyWrapper.connect() handle: <TimerHandle when=1637274.667268756 MyWrapper.connect()> Traceback (most recent call last): File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run self._callback(*self._args) File "test_reconnect.py", line 17, in connect self.client.connect('127.0.0.1', 4002, 99) File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/client.py", line 142, in connect util.syncAwait(self.connectAsync(host, port, clientId, timeout)) File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/util.py", line 242, in syncAwait result = _syncAwaitAsyncio(future) File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/util.py", line 247, in _syncAwaitAsyncio assert asyncio.Task is asyncio.tasks._PyTask AssertionError ^CTraceback (most recent call last): File "test_reconnect.py", line 30, in mywrapper.client.run() File "/home/dan/.local/lib/python3.6/site-packages/ib_insync/client.py", line 99, in run loop.run_forever() File "/usr/lib/python3.6/asyncio/base_events.py", line 421, in run_forever self._run_once() File "/usr/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once event_list = self._selector.select(timeout) File "/usr/lib/python3.6/selectors.py", line 445, in select fd_event_list = self._epoll.poll(timeout, max_ev) KeyboardInterrupt sys:1: RuntimeWarning: coroutine 'Client.connectAsync' was never awaited

erdewit commented 6 years ago

The assert is only hit in the case of nested event loops. To allow nested event loops, use util.patchAsyncio().

devan0 commented 6 years ago

util.patchAsyncio() fixed everything. Thank you!

JSunRae commented 2 years ago

Call it before this line'asyncio.get_event_loop().call_later(5, self.connect)'?

ken-wizhodl commented 2 years ago

It looks like a harmless warning, not an exception.

Not sure why you're using wrapper.apiEnd, don't you mean apiError? In any case it would be better to schedule a call to connect instead of a direct call, i.e. use loop.call_later(...) or such.

This is the gist of what I'm doing:

    def connect(self, wait=True):
        f = asyncio.ensure_future(self.connectAsync())
        if wait:
            util.syncAwait(f)
        self.setConnected()

    async def connectAsync(self):
        """
        Connect to a running TWS/gateway application.
        """
        if self.ib.isConnected():
            return
        self.ib.client.apiError = None
        self._logger.info('Trying to connect...')
        while True:
            try:
                await self.ib.connectAsync(self.host, self.port, self.clientId)
                if self.ib.isConnected():
                    break
            except ConnectionRefusedError:
                self._logger.error('Connect failed')
                await asyncio.sleep(5)
        self.ib.client.apiError = self._onApiError

    def _onApiError(self, _errorMsg):
        """
        Reconnect after connection failure. By default the reconnect is
        postponed for half a minute, otherwise TWS can give back error 102
        ("Max number of tickers has been reached").
        """
        delaySecs = 30
        self._logger.info(f'Reconnecting in {delaySecs} seconds')
        asyncio.get_event_loop().call_later(delaySecs, self.connect)

in the latest version, you may need to comment out the following line self.ib.client.apiError = None

and replace this line self.ib.client.apiError = self._onApiError

to self.ib.client.apiError += self._onApiError

or, you will get "emit" error.

you may want to know apiError could handle the disconnect problem due to ib gateway restart, you don't need additional disconnectEvent to do the job for you.

nurettin commented 2 years ago

Sorry for commenting on a closed issue, but I wanted to share a solution.

If peer closes connection (TWS being shutdown), you can catch this using the disconnectedEvent. Then you will need to break the run loop. Instead of dealing with async, I use a simple SIGINT signal for this. The program also needs to know whether if we are restarting due to a disconnect issue, or the user just wants to stop the program. This is why I set a class variable called reconnecting to True if our SIGINT's intention is to reconnect. This way, we know if we want to stop retrying and exit to program.

If TWS loses connection to internet temporarily, then reconnects to market data (errorCode = 1102), we need to handle this inside errorEvent. I just set reconnecting to True, call disconnect() and raise SIGINT. If you dont' disconnect, reconnect will not succeed due to clientId = 1 still connected.

If we lose connection to TWS due to API being disabled (errorCode = 1300) we need to set reconnecting to True, call disconnect() (might not be needed but it is idempotent) and raise SIGINT.

After doing all of this, there is still a problem. If I login from another computer, then logout, we still need to manually click the "Reconnect this session" button of the modal dialog that appears in the TWS application.

renegadephysicist commented 1 year ago

Apologies for opening up this closed issue but here is what I did that seems to work in case anyone searching in the future finds it useful. Ideally I would like to remove the util.patchAsyncio() call somehow

def connect_to_ibkr():
    """
    Connect to a running TWS/gateway application.
    """
    print('Trying to connect...')
    max_attempts = 10
    current_reconnect = 0
    delaySecs = 60
    ib.disconnect()
    while True:
        try:
            ib.connect("127.0.0.1", port=7496, clientId=4, timeout=5)
            if ib.isConnected():
                print('Connected')
                break
        except Exception as err:
            print("Connection exception: ", err)
            if current_reconnect < max_attempts:
                current_reconnect += 1
                print('Connect failed')
                print(f'Retrying in {delaySecs} seconds, attempt {current_reconnect} of max {max_attempts}')
                ib.sleep(delaySecs)
            else:
                sys.exit(f"Reconnect Failure after {max_attempts} tries")
    ib.disconnectedEvent += onDisconnected

    # Function to call after successful connection
    do_something_important_upon_connection()

def onDisconnected():
    print("Disconnect Event")
    print("attempting restart and reconnect...")
    connect_to_ibkr()

ib = IB()
util.patchAsyncio()
connect_to_ibkr()
ib.disconnect()
mattsta commented 1 year ago

Be careful with your line here:

ib.disconnectedEvent += onDisconnected

I had that same bug for a long time. You're adding an additional handler every time you run the reconnect, so when you reconnect the first time, you'll now have 2 reconnect events, then 3, then 4, then ...

You never want to attach event handlers in something called more than once.

Here's the version I ended up with: https://github.com/mattsta/icli/blob/58beee8b9890ed5d4dd9c5819ac437b6e8c12574/icli/cli.py#L1777-L1881

renegadephysicist commented 1 year ago

Be careful with your line here:

ib.disconnectedEvent += onDisconnected

I had that same bug for a long time. You're adding an additional handler every time you run the reconnect, so when you reconnect the first time, you'll now have 2 reconnect events, then 3, then 4, then ...

Thanks,

I ran into this exact bug after posting the code above. What I've now done is reinitialise the IB() object everytime the disconnect occurs. But I think your solution is probably a better more robust one. This code is an attempt to avoid the asynchronous code blocks and just use simple python functions as I kept running into issues.

def connect_to_ibkr():
    """
    Connect to a running TWS/gateway application.
    """
    global ib
    max_attempts = 10
    current_reconnect = 0
    delaySecs = 60

    print('Trying to connect...')

    if ib is not None:
         ib.disconnect()
    ib = IB()
    util.patchAsyncio()

    while True:
        try:
            ib.connect("127.0.0.1", port=7496, clientId=4, timeout=5)
            if ib.isConnected():
                print('Connected')
                break
        except Exception as err:
            print("Connection exception: ", err)
            if current_reconnect < max_attempts:
                current_reconnect += 1
                print('Connect failed')
                print(f'Retrying in {delaySecs} seconds, attempt {current_reconnect} of max {max_attempts}')
                ib.sleep(delaySecs)
            else:
                sys.exit(f"Reconnect Failure after {max_attempts} tries")
    ib.disconnectedEvent += onDisconnected

    # Function to call after successful connection
    do_something_important_upon_connection()

def onDisconnected():
    print("Disconnected, attempting restart and reconnect...")
    connect_to_ibkr()

ib = None
connect_to_ibkr()
ib.disconnect()