erdewit / ib_insync

Python sync/async framework for Interactive Brokers API
BSD 2-Clause "Simplified" License
2.82k stars 757 forks source link

Consideration for a differently implemented synchronous api #262

Closed goodboy closed 4 years ago

goodboy commented 4 years ago

@erdewit first I want to thank you immensely for this project :smile_cat:

I've dug through the messaging machinery in client.py and I have to say, thank you so much for allowing the world to avoid having to use the monstrosity that is ibapi.client.EClient :rocket:

The point of this issue is to suggest a change and possibly start a discussion surrounding an alternative method for giving users a synchronous api around ib_insync.

I realize the library is supposed to equip users unfamiliar with asynchronous systems but at the moment the approach used is disadvantageous to users who are familiar with async programming and in particular asyncio and the beauty that normally comes with the ability to leverage that ecosystem for data streaming purposes.

My proposal for the sync API

Why to change this

My apologies if this was a bit long but I'd like to at least open discussion and get your thoughts.

Thanks :)

erdewit commented 4 years ago

I've thought about this as well and think it would indeed make it easier, mostly for the synchronous users. For async users, I wonder if they would be much happier though. It's fully async already with the IB.*Async methods, and gets along very well with other asyncio libraries.

One thing to consider is that ib_insync is not only doing requests, it's also maintaining the "current state". This state gets updated dynamically and can be queried at any time. So some form of synchronization is needed when the updating and the querying happen in different threads.

I'm looking forward to more discussion or ideas on this.

goodboy commented 4 years ago

For async users, I wonder if they would be much happier though. It's fully async already with the IB.*Async methods, and gets along very well with other asyncio libraries.

@erdewit they will be much happier when you realize the benefits of asyncio.run() and asyncio.create_task() having first class support in new Python versions (especially if you want to spawn async generators for streaming and have them clean up correctly). Also normally as an async programmer you would want the ability to handle ticker streams events inside an async function (aka a task) instead of a callback. I realize a user can do this on their own but having an ib_insync way would probably be a much better starting point (plus it might allow us to handle other unforeseen edge cases properly as they emerge) and would allow us to more properly handle teardown conditions (often a tricky thing to get right especially if you want error propagation from subtasks).

Also, going back to my one point:

it should allow for true concurrent api requests to IB's endpoints; eg. getting multiple ContractDetails in "parallel" (in the i/o sense of course) even from the synchronous API.

Currently this confusing to "wrap" since async methods are intertwined with "pretend sync" methods (I say this because the sync api really isn't sync since it has to incrementally yield back control the the event loop) whereas a higher level set of async "batch request" routines could probably be better organized and understood separately. Though this is of course my opinion.

One thing to consider is that ib_insync is not only doing requests, it's also maintaining the "current state". This state gets updated dynamically and can be queried at any time. So some form of synchronization is needed when the updating and the querying happen in different threads.

Agreed, but this is being done using asyncio.Protocol callbacks which can happily run in a thread and any state that is "queued up" should be put in a asyncio.Queue (or other suitable IPC mechanism) so that synchronous code can ask for and receive it from the async thread when ready. Normally this would be done by the sync thread submitting a task to the async thread to read everything available in the queue and return it. This also prevents memory leaks by using bounded queues which there is no way to prevent currently if bad behaving user code blocks erroneously. Additionally, any speed improvements made to async code (eg. using uvloop) will always be fully utilized since it is impossible for sync code to block it.

The main issue is that you can offer an async api that doesn't run blocking sync code or a sync api that runs async code incrementally but you can't support both in one thread. You have to pick which type of API you are and if you want to be sync and run background tasks continuously then the natural fit is a background thread running the async code which never blocks indefinitely (until told to stop) and the blocking interface can submit tasks to that thread (polling) to retrieve new data and/or control the runtime in a truly sync way.

What really set off alarms for me was this note in the docs:

If a user operation takes a long time then it can be farmed out to a different process.

This effectively says to our users:

"don't worry, it's a a synchronous API around an asynchronous system but you have to call your synchronous code asynchronously to make it work" :nerd_face:

goodboy commented 4 years ago

@erdewit I should probably also note the main reason for the mutex necessity of sync and async apis running in separate threads (when we expect background state to be tracked) is also because I'm assuming you only want one TCP connection with IB's api.

I'm not sure if this is a hard restriction or not? I know there's data line restrictions but are there any client connection count restrictions? I can't find anything here.

If that's not the case then in the sync api case we could only spawn the bg thread when a streaming feed is started. For me that seems like unneeded complexity if we're going to support the bg thread case anyway.

goodboy commented 4 years ago

@erdewit I've got a wrapper going to integrate streaming with my existing system so I'll report back eventually with my more final version of that and see what you think.

If I were to hipshot it I'd suggest going for structured-concurrecy-like sync api. As an example, say you wanted to write a wrapper function that concurrently searches for ContractDetails from a sync api:


from ib_insync import open_sync_api

with open_sync_api(host='blah', port=4003) as api:
    # while inside this block the background thread and event loop are running
    # when this block closes the connection, loop, and thread are all torn down and 
    # any collected errors raised

    # this is a new method that allows searching concurrently for all contract details 
    # that partially match but the requests are made concurrently as you'll see below
    matching_contracts = api.searchForContracts('SPY')

   # one could conceivably "stream" data into a sync code system using something like the following
   # where the event loop delivers data continuously and the sync code polls for it
   stream = api.stream_from_ticker(matching_contracts[0])
   while not stream.is_closed():
       for tick in iter(stream):
           display_in_ui(tick)

assert all(isinstance(c, ib_insync(ContractDetails) for c in matching_contracts)

Underneath that context manager would be something like:

# this is inside ib_insync's code base
from threading import Thread
import asyncio
from contextlib import contextmanager, asynccontextmanager

from .async_api import open_api
from .ib import IB

@asynccontextmanager
async def open_api(host, port):
    await ib.connectAsync(host, port)
    yield ib
    ib.disconnect()

async def async_main(loop, user_main_task=asyncio.sleep(float('inf'))):
    asyncio.set_event_loop(loop)  # set the loop created in the parent in this thread

    async with open_api() as api:  # connection is established here (see above)
        # run whatever main task or just sleep while the loop runs
        await asyncio.create_task(user_main_task)

@contextmanager
def open_sync_api():
    # note there should be no loop running because we have not started it
    # and every start/stop should be controlled by explicitly by contextmanager lifetimes
    loop = asyncio.new_event_loop()
    ib = IB()

   # here, really you would run some wrapper async func around ``asyncio.run()`` 
   # that relays any raised errors to this "parent thread" over a data structure
    thread = Thread(
        target=asyncio.run,
        args=(partial(async_main, loop),),
        name='ib_async_loop_thread',
    )

    class IBSyncAPI:
        def __init__(self, ib, loop):
           self.ib = ib
           self.loop = loop

        def self._submit_to_bg_loop_and_get_result(self, method, *args, timeout=5):
            return asyncio.run_coroutine_threadsafe(
                getattr(self.ib, method),
                loop=self.loop,
            ).result(timeout)

        # add all sync method names on this class and have them call method above
        # you could add most of them by introspecting the IB object and setattring them in a loop

        def reqMatchingSymbols(self, pattern: str):
            return self._submit_to_bg_loop_and_get_result('reqMatchingSymbolsAsync', pattern)

        # add a fancy batch requester sync method
        def _batch_request_from_bg_loop(self, methods, args_seq, timeout=5):
            return asyncio.run_coroutine_threadsafe(
                asyncio.gather([partial(getattr(self.ib, method), args) for (method, args) in zip(methods, args_seq)])
                loop=self.loop,
            ).result(timeout)

        # define our made up batch function from the first file
        def searchForContracts(self, pattern):
            descriptions = self.reqMatchingSymbols(pattern)            

            details = {}
            contracts = [d.contract for d in descriptions]
            for description in descriptions:
                con = description.contract
                deats = self._batch_request_from_bg_loop(
                    method=['reqtContractDetails']*len(contracts),
                    args=contracts,
                )
                for d in deats:
                    details[f'{con.symbol}.{con.primaryExchange}] = d
            return details

    # start the bg thread and thus the bg loop
    thread.start()
    yield IBSynAPI(ib, loop)

    # as mentioned above here you would also receive and reraise errors from the bg thread/loop
    loop.stop()
    thread.join()

My apologies if the there are errors or formatting issues with the code. It's fairly difficult to format on github :smile_cat:

This should at least give a general idea. I'll link to my production code when complete.

erdewit commented 4 years ago

I'm assuming you only want one TCP connection with IB's api.

TWS/gateway accepts up to 32 connections, and ib_insync handles an arbitrary number of connections (it can connect to multiple TWS/gateway instances too). But all clients can happily live in one event loop, in one thread, like they do now in the main thread. The CPU usage of the clients is very low so they'll not be blocking each other.

erdewit commented 4 years ago

As far as using asyncio.run, doing cleanup, doing parallel requests, using tick streams inside a coroutine, consider the following code snippet:

import asyncio

import ib_insync as ibi

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync():
            contracts = await self.searchForContracts('SPY')
            for contract in contracts:
                self.ib.reqMktData(contract)

            async for tickers in self.ib.pendingTickersEvent:
                print(tickers)

    def stop(self):
        self.ib.disconnect()

    async def searchForContracts(self, pattern):
        descriptions = await self.ib.reqMatchingSymbolsAsync(pattern)
        tasks = [
            self.ib.reqContractDetailsAsync(descr.contract)
            for descr in descriptions]
        results = await asyncio.gather(*tasks)
        return [cd.contract for cds in results for cd in cds]

app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()

Using async.create_task is not problem either. Multiple concurrent coroutines can be spawned, for example to handle different tick streams.

I'm looking forward to your production code and if it's possible to have a shared "current state".

goodboy commented 4 years ago

TWS/gateway accepts up to 32 connections, and ib_insync handles an arbitrary number of connections

Gtk.

consider the following code snippet:

Yeah, this code looks great. The problem is you can't do it like this with the sync api :wink:

with await self.ib.connectAsync():

Didn't notice that IB.__exit__ was supported. I think there's a couple reasons for me missing these things:

async for tickers in self.ib.pendingTickersEvent: print(tickers)

Aha! I didn't realize you could do this. Huh, I definitely missed the __aiter__ support in EventKit. This is much more along the lines of the api I want for streaming. I see you've got it already implemented with an async gen. I'll be testing the teardown resilience of this for sure :sweat_smile:. This should probably also be more prominently documented in a ticker streaming section (which I don't think there is one in the docs right?).

Using async.create_task is not problem either.

Agreed. I was not saying otherwise just that this should be the task launching api that has focus; I realize this is a move to 3.7+ thing.

I'm looking forward to your production code and if it's possible to have a shared "current state".

My code will actually be entirely async (actually by sneaking in use of trio which my system is built on) so the "shared state" isn't an issue.

I do think my code could be used for some docs (as well as this code you have written) to cover the async api :100:. Also if I get the trio integration working well enough I'll submit that for use for users who prefer that framework.

If we went towards the background thread for the sync api I do think it would make async and sync code look nearly identical which would be pretty ideal if you really want to support a sync interface.

goodboy commented 4 years ago

By the way.

For anyone interested (and who also might have the impression the bg thread approach is surprising or unconventional). The more modern (structured concurrency focused) framework trio has a specific api for doing this very thing.

Namely, a public api to allow inter-thread communication. Unfortunately, asyncio is not (yet) so well equipped.

goodboy commented 4 years ago

For my further reference, if we eventually implement using asyncio.run() in the background thread, likely the main task should await ib.disconnectedEvent as subtasks will be spawned using asyncio.run_coroutine_threadsafe().

This is what I'm doing in my trio adapter code to get reliable teardown on connection drops or other errors.

romanrdgz commented 4 years ago

Would love to know if Goodboy finally achieved the integration with Trio library

erdewit commented 4 years ago

@goodboy You may be interested in this thread: https://bugs.python.org/issue22239

At the end there is some talk of using greenlets to single-threadedly integrate sync and async code, basically by providing an await equivalent that utilizes the greenlets. Note that ib-insync will not go in this direction, but perhaps it can be useful for integrating with trio or curio.

Perhaps that ib-insync will start to use a separate socket-reading thread akin to what ibapi does. This would assure accurate timestamps of the data arrival, a tiny benefit but might be worth it.

goodboy commented 3 years ago

@romanrdgz indeed we did get things working with trio but not in the way you'd expect. Also, next time you have to use @goodboy so I actually see this message ;)

Currently the solution is to run trio in guest mode inside the asyncio loop inside a new process and stream ib_insync data up to the supervising parent process. There's a PR experimenting with an API for this here.

I'm not super happy with this approach mostly because ib_insync has been built to run as a standalone app (with a lot of emphasis on local order state tracking through an object API) instead of as an API that is usable for IPC. There's frankly quite a few things that would need to be changed in this library in order to make that possible.

Down the road we will likely put in some effort towards a sans-io approach to the IB api that can be used in any i/o framework. Unfortunately, I think that will mean a complete rewrite of the IB-API-protocol handling into a much simpler and more data transparent layer. I don't think ib_insync can be easily reworked to accomplish this without a substantial shift in its architecture 😢

However, on the bright side, the details of the TCP based messaging inside this project will serve as a great reference implementation if/when someone attempts for the sans-i/o project 😸

markmark1 commented 3 years ago

@goodboy would it be possible to share your trio integration/production code as you had advised earlier to help document such use for asyncio curio focussed users of ib_insync . thank for an excellent thread