erdewit / ib_insync

Python sync/async framework for Interactive Brokers API
BSD 2-Clause "Simplified" License
2.76k stars 726 forks source link

AttributeError: 'NoneType' object has no attribute 'send' - self._write_fut = self._loop._proactor.send(self._sock, data) #630

Closed windowshopr closed 11 months ago

windowshopr commented 11 months ago

I am setting up a simple flask backend server that is to be responsible for connecting to IB Gateway, and accepting order parameters to submit new orders.

I have created a minimal reproducible example below of the error I'm getting. Ensure that you have Flask installed and updated, save this code into a server.py file, and run it with the command flask --app server run.

With your IB Gateway running and your global variables changed at the top, you can use Postman to create an empty POST request to the http://127.0.0.1:5000/connect-to-ib-gateway endpoint, and you'll see in your Gateway a new API client connects successfully.

Now, when I submit a JSON POST request to the http://127.0.0.1:5000/post-order endpoint, I get the below error traceback. For ease, I'm testing with this raw JSON Body:

{
    "ticker_symbol":"SPY",
    "expiry":"20230705",
    "strike":450,
    "call_or_put":"C",
    "order_size":1,
    "limit_entry_price":1.35,
    "stop_loss_price":1.00,
    "take_profit_price":3.00
}

image

(I also added this header just to cover all my bases)

image

I'm new to asynchronous programming/how to handle IB across multiple methods, but I think the issue lies somewhere in methods being run in different threads, or maybe it's something else entirely, I don't know. I'm just not sure how to interpret the traceback.

flask --app server run
 * Serving Flask app 'server'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.        
 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
127.0.0.1 - - [28/Aug/2023 13:24:12] "POST /connect-to-ib-gateway HTTP/1.1" 200 -
SPY 20230705 450.0 C 1 1.35 1.0 3.0
Option(symbol='SPY', lastTradeDateOrContractMonth='20230705', strike=450.0, right='C', exchange='SMART')
[2023-08-28 13:24:14,293] ERROR in app: Exception on /post-order [POST]
Traceback (most recent call last):
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\flask\app.py", line 2190, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\flask\app.py", line 1486, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\flask_cors\extension.py", line 165, in wrapped_function
    return cors_after_request(app.make_response(f(*args, **kwargs)))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\flask\app.py", line 1484, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\flask\app.py", line 1469, in dispatch_request 
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\asgiref\sync.py", line 277, in __call__       
    return call_result.result()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 437, in result
    return self.__get_result()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 389, in __get_result     
    raise self._exception
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\asgiref\sync.py", line 353, in main_wrap      
    result = await self.awaitable(*args, **kwargs)
  File "I:\nasty\Python_Projects\Stock_Options_Trading\Interactive_Brokers_Server\backend\server.py", line 245, in post_order 
    qualifiedContracts = await ib.qualifyContracts(contract)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\ib.py", line 570, in qualifyContracts
    return self._run(self.qualifyContractsAsync(*contracts))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\ib.py", line 318, in _run
    return util.run(*awaitables, timeout=self.RequestTimeout)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\util.py", line 341, in run
    result = loop.run_until_complete(task)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\nest_asyncio.py", line 90, in run_until_complete
    return f.result()
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\asyncio\futures.py", line 178, in result
    raise self._exception
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\asyncio\tasks.py", line 280, in __step
    result = coro.send(None)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\ib.py", line 1798, in qualifyContractsAsync
    detailsLists = await asyncio.gather(
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\ib.py", line 1799, in <genexpr>     
    *(self.reqContractDetailsAsync(c) for c in contracts))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\ib.py", line 1931, in reqContractDetailsAsync
    self.client.reqContractDetails(reqId, contract)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\client.py", line 657, in reqContractDetails
    self.send(*fields)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\client.py", line 269, in send       
    self.sendMsg(msg.getvalue())
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\client.py", line 282, in sendMsg    
    self.conn.sendMsg(self._prefix(msg.encode()))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\site-packages\ib_insync\connection.py", line 52, in sendMsg 
    self.transport.write(msg)
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\asyncio\proactor_events.py", line 359, in write
    self._loop_writing(data=bytes(data))
  File "C:\Users\chalu\AppData\Local\Programs\Python\Python38\lib\asyncio\proactor_events.py", line 395, in _loop_writing     
    self._write_fut = self._loop._proactor.send(self._sock, data)
AttributeError: 'NoneType' object has no attribute 'send'
127.0.0.1 - - [28/Aug/2023 13:24:14] "POST /post-order HTTP/1.1" 500 -

Here is the code:

from flask import Flask, request, jsonify
from flask_cors import CORS  # Import the CORS class
from ib_insync import *

# Start the event loop for ib_insync when program starts.
# This ensures the connect_to_ib_gateway route succeeds
util.startLoop()

# Global variables
SERVER_IP_ADDRESS = '127.0.0.1'
IB_GATEWAY_PORT = 4002
IB_GATEWAY_CLIENT_ID = 10

# Create the Flask app
app = Flask(__name__)
CORS(app, supports_credentials=True)  # Enable CORS for all routes. prevents this error in browser:
"""
Access to fetch at 'http://localhost:5000/login' from origin 'http://127.0.0.1:4173' 
has been blocked by CORS policy: Response to preflight request doesn't pass access 
control check: No 'Access-Control-Allow-Origin' header is present on the requested 
resource. If an opaque response serves your needs, set the request's mode to 'no-cors' 
to fetch the resource with CORS disabled.
"""

# Initialize a global IB object
ib = IB()

#################### Event Handlers
# Event handler for - connected
def handle_connected(*args):
    ib.onConnected -= handle_connected  # Unsubscribe from the event
    return jsonify({'message': 'Connected to IB Gateway'})

# Event handler for - disconnected
def handle_disconnected(*args):
    return jsonify({'message': 'Disconnected from IB Gateway'})

# Event handler for - error
# WIP
#################### Event Handlers End

#################### Routes
# Route for establishing a connection to the IB Gateway API
@app.route('/connect-to-ib-gateway', methods=['POST'])
async def connect_to_ib_gateway():
    try:
        ib.connect(SERVER_IP_ADDRESS, IB_GATEWAY_PORT, clientId=IB_GATEWAY_CLIENT_ID)
        # Subscribe to the connected and disconnected events
        ib.connectedEvent += handle_connected
        ib.disconnectedEvent += handle_disconnected
        # Use a loop to periodically check if the connection is established
        for _ in range(20):  # Try for 20 seconds
            if ib.isConnected():
                return jsonify({'message': 'Connected to IB Gateway'})
            await ib.sleep(1)  # Sleep for 1 second
        # If not connected after 20 seconds, return an error
        return jsonify({'message': 'Error - Connection not established after timeout'})
    except Exception as e:
        return jsonify({'message': 'Error connecting to IB Gateway', 'error': str(e)})

# Route for getting the current IB Gateway connection status
@app.route('/connection-status', methods=['GET'])
def connection_status():
    if ib.isConnected():
        return jsonify({'status': 'connected'})
    else:
        return jsonify({'status': 'disconnected'})

# Route for disconnecting from the IB Gateway API
@app.route('/disconnect-from-ib-gateway', methods=['POST'])
async def disconnect_from_ib_gateway():
    if ib.isConnected():
        try:
            ib.disconnect()
        except RuntimeError as e:
            if "Event loop is closed" in str(e):
                pass
        finally:
            return jsonify({'message': 'Disconnected from IB Gateway'})
    else:
        return jsonify({'message': 'Disconnected from IB Gateway'})

# Route for accepting POST requests to open a new position. Attempted both asyn
# and non-async methods, neither worked.
@app.route('/post-order', methods=['POST'])
async def post_order():

    # Get the option order details from the request's json. Assume the frontend is
    # handling the validation of the data
    data = request.json
    ticker_symbol = str(data.get('ticker_symbol')).upper()
    expiry = str(data.get('expiry'))
    strike = float(data.get('strike'))
    call_or_put = str(data.get('call_or_put')).upper()
    order_size = int(data.get('order_size'))
    limit_entry_price = float(data.get('limit_entry_price'))
    stop_loss_price = float(data.get('stop_loss_price'))
    take_profit_price = float(data.get('take_profit_price'))

    # Inspect the parameters to make sure endpoint is actually being hit
    print(ticker_symbol, expiry, strike, call_or_put, order_size, limit_entry_price, stop_loss_price, take_profit_price)

    # Create a contract for the option via ticker symbol
    contract = Option(ticker_symbol, expiry, strike, call_or_put, 'SMART')
    print(contract)

    # # Grab the current event loop?
    # loop = util.getLoop()

    # This is the line that causes issues
    qualifiedContracts = await ib.qualifyContracts(contract)

    # Inspect the contract object
    print(qualifiedContracts)

    return jsonify({'message': 'Order placed successfully'})
#################### Routes End

#################### Run the app
if __name__ == '__main__':
    app.run(port=5000)
windowshopr commented 11 months ago

After some digging and tweaking, and randomly the help of this post #271 , I've been able to further reduce the code to highlight the issue I'm facing. Forget the above code, and see this updated code with comments describing the problem I'm facing re: ib.qualifyContracts(contract) "hanging" during the request:

import nest_asyncio
from flask import Flask, request, jsonify
from flask_cors import CORS
from ib_insync import *

util.patchAsyncio()

# Global variables
IB_GATEWAY_SERVER_IP_ADDRESS = '127.0.0.1'
IB_GATEWAY_PORT = 4002
IB_GATEWAY_CLIENT_ID = 10

app = Flask(__name__)
CORS(app, supports_credentials=True)

# Initialize IB object
ib = IB()

# Route for establishing a connection to the IB Gateway API
@app.route('/connect-to-ib-gateway', methods=['POST'])
def connect_to_ib_gateway():
    try:
        ib.connect(IB_GATEWAY_SERVER_IP_ADDRESS, IB_GATEWAY_PORT, clientId=IB_GATEWAY_CLIENT_ID, readonly=False)
        return jsonify({'message': 'Connected to IB Gateway'})
    except Exception as e:
        return jsonify({'message': 'Error connecting to IB Gateway', 'error': str(e)})

# Route for accepting POST requests to open a new position
@app.route('/post-order', methods=['POST'])
async def post_order():

    # Create a contract object
    contract = Forex("EURUSD", "IDEALPRO", "EUR", "USD")
    print(contract) # To ensure endpoint is being hit

    # This causes the request to hang/get stuck on "requesting"
    # and am not able to CTRL + C to stop the server terminal.
    await ib.qualifyContractsAsync(contract)

    # If we take the "await" out, the request sends the success message
    # below back, however it results in a "ib.qualifyContractsAsync(contract)
    # was never awaited" error.

    # By making the post_order() function NON-async, and using 
    # ib.qualifyContracts(contract) instead of the Async version,
    # the same "hanging" issue occurs, although you ARE able to CTRL+C
    # out of the server terminal to shut it down.

    # Inspect the contract object again after qualifying
    print(contract)

    return jsonify({'message': 'Order placed successfully'})

if __name__ == '__main__':
    app.run(port=5000)
jlixfeld commented 11 months ago

I can suggest one of three things.

  1. Have a look at these as well: https://github.com/erdewit/ib_insync/issues/585, https://github.com/erdewit/ib_insync/issues/266

  2. From my own experiences (stemming from inexperience) trying to scale my use case for ib_insync, I've decided that splitting things up into multiple scripts instantiated using containers gets me around a lot of the nuanced asyncio stuff that caused me so many headaches. It makes the code base more complex, sure, but it works and it helps keeps things cleaner and more modular which works for my brain.

  3. Ask ChatGPT. It may not provide a direct answer, but it certainly knows a million different ways to do a thing, and maybe one of those ways will give you an idea that could help.

windowshopr commented 11 months ago

Haha it’s funny because I was asking chatGPT for HOURS trying to get it to help me figure the issue out but even it says things like “it seems to be related to ib_insync’s asynchronous programming, try this” and of course you go down the rabbit holes of its “my apologies, there seemed to be an error in my previous code, try this” a million times but still to no avail.

I will review the posts you suggested to see if I can find a solution there for now. If anyone can get the (second) code sample I’ve provided working in the mean time I would be extremely grateful so I can see what the fix is as I’m new to handling ib_insync in a flask app. Thanks!

windowshopr commented 11 months ago

After some quick reading from a few different threads in there, it looks like flask it not the ideal framework to use as it has its own blocking/threading functionality and isn’t built for asyncio integration, BUT something like FastAPI is and might work! When I get some time this week, I’ll rewrite using FastAPI (never used yet, but always wanted to so this is good) and see if I can come up with a solution 👍

windowshopr commented 11 months ago

Ok thanks to @jlixfeld , I refactored the code to use FastAPI for the backend instead of Flask and VOILA! Thanks :)

import nest_asyncio
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from ib_insync import *

util.patchAsyncio()

# Global variables
IB_GATEWAY_SERVER_IP_ADDRESS = '127.0.0.1'
IB_GATEWAY_PORT = 4002
IB_GATEWAY_CLIENT_ID = 10

app = FastAPI()
origins = ["*"]  # Adjust this list to restrict origins if needed
app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

# Initialize IB object
ib = IB()

# Route for establishing a connection to the IB Gateway API
@app.post('/connect-to-ib-gateway')
async def connect_to_ib_gateway():
    try:
        ib.connect(IB_GATEWAY_SERVER_IP_ADDRESS, IB_GATEWAY_PORT, clientId=IB_GATEWAY_CLIENT_ID, readonly=False)
        return {'message': 'Connected to IB Gateway'}
    except Exception as e:
        raise HTTPException(status_code=500, detail='Error connecting to IB Gateway')

# Route for accepting POST requests to open a new position
@app.post('/post-order')
async def post_order():
    try:
        # Create a contract object
        contract = Forex("EURUSD", "IDEALPRO", "EUR", "USD")
        print(contract)  # To ensure endpoint is being hit

        # Qualify the contract asynchronously
        await ib.qualifyContractsAsync(contract)

        # Inspect the contract object again after qualifying
        print(contract)

        return {'message': 'Order placed successfully'}
    except Exception as e:
        raise HTTPException(status_code=500, detail='Error placing order')

if __name__ == '__main__':
    nest_asyncio.apply()
    uvicorn.run(app, host='127.0.0.1', port=5000)

# uvicorn main:app --port 5000