softwarespartan / IB4m

Interactive Brokers API for Matlab
GNU General Public License v2.0
62 stars 21 forks source link

Problem trying to establish multiple, independent data streams running in parallel using 'parfeval' & 'batch' #129

Open LimitlessTrader opened 3 years ago

LimitlessTrader commented 3 years ago

Abel,

Thank you for creating and maintaining such a great app (IB4M). While I'm new to OOP and almost certainly don't fully appreciate the elegance of the architecture, I really do appreciate the readability of the code.

(PS Apologies for the formatting of this issue, I haven't got to grips with this text editor yet)

Problem / Objective

I'm trying to submit/run multiple, independent data streams in parallel (e.g. 1. TWS.Events.MARKETDEPTH, 2. TWS.Events.TICKBYTICK_BIDASK, 3. later there will be others), which update an object in the main function, while allowing the main function to continue processing other activities (e.g. creating and maintaining orders in IB)

I have tried using parfeval and batch both with similar error results: (Note: these are selected error messages, the full results are attached in documents.)

Warning: conid 0 symbol GBP secType CASH strike 0.0 exchange IDEALPRO currency USD localSymbol GBP.USD is not serializable

Warning: Could not serialize object: com.proxy.ProxyBuilder$Handler@239b0f9d java.io.NotSerializableException: com.proxy.ProxyBuilder$Handler

Warning: com.tws.Handler@6bee793f is not serializable

Warning: com.ib.client.EClientSocket@726aa968 is not serializable

Warning: Could not serialize object: [com.tws.Handler$MarketDepthEvent[source=com.tws.Handler@6bee793f],

(Caution: The attached error logs are very large, I'm sure there is much duplication, but I've highlighted what I believe to be the relevant messages for IB4M triggered by the parfeval lines in Main() ).

Error Messages from Market Depth.txt Error Messages from Time & Sales.txt

Relevant code from the Main (controlling) function

(Please note that I have included the code for both parfeval & batch and just comment out those lines I'm not testing).

function Main()

%% MATLAB Environment settings

% Start the parellel worker pool (for 'parfeval' commends)
p = parpool(2);

% Start the parallel worker cluster (for 'batch' commands) 
% c = parcluster();

%% Open the connection to Interactive Brokers

% get TWS session instance
session = TWS.Session.getInstance();

% Create connection to IB API
session.eClientSocket.eConnect('127.0.0.1',7497,0);

%% Create the 'contract' for the instrument

contract = com.ib.client.Contract();
contract.symbol('GBP');
contract.secType('CASH');
contract.exchange('IDEALPRO');
contract.currency('USD');
contract.localSymbol('GBP.USD');`

`%% Create MY environment

MY = MyEnvironment();`

`%% Create the IB data streams

MD = marketDepth();        

TS = timeSales();`

` %% Start the data streams

% startMD(MD, session, contract, MY);
f(1) = parfeval(@startMD, 0, MD, session, contract, MY)
% job = batch(c,@startMD,1,{MD,session,contract,env},'Pool',1);

% startTS(TS, session, contract, MY);
f(2) = parfeval(@startTS, 0, TS, session, contract, MY)
% job = batch(c,@startTS, 0,{TS, session, contract, MY},'Pool',2);`

`%% Stop the data streams  (Used here during testing)

% stopMD(MD, session);

% stopTS(TS, session);`

end

Relevant code from class 'marketDepth'

classdef marketDepth()

%   All methods required to manage the Market Depth data stream
%   from Interactive Brokers (IB)

`properties
    numRows = 1;`    %default

    reqId
    dsHandle
end

methods
    function this = marketDepth(value)
        %marketDepth Construct an instance of this class
        switch nargin
            case 0
                % Use the default value i.e. do nothing
            case 1
                % Validate the number of rows required         
                if isnumeric(value) && isscalar(value) && ceil(value)== floor(value) && value >= 1 && value <= 10
                    this.numRows = value;
                else
                    error('Invalid numRows');
                end
            otherwise
                error('Too many input parameters');
        end
    end

    function this = startMD(this, session, contract, MY)
        % Set the reqId to only process this request data
        this.reqId = 1;       % For testing only to identify the data stream
        % this.reqId = randi(99999);

        % % Attach the listener
        this.dsHandle = event.listener(         ...
            TWS.Events.getInstance             ,...
            TWS.Events.MARKETDEPTH             ,...
            @(s,e)this.processMD(s,e,MY)        ...
        );

        % Start the Market Depth data stream
        session.eClientSocket.reqMktDepth(      ...
            this.reqId                         ,...
            contract                           ,... 
            this.numRows                       ,... 
            []                                  ...
        );
    end

    function stopMD(this, session)
        session.eClientSocket.cancelMktDepth(1);      % For testing only
        % session.eClientSocket.cancelMktDepth(this.reqId);
    end

    function processMD(this,s,e, MY)
        % Pre-process the Market Depth data from the data stream

        % Extract the current Bid & Ask values
        if e.event.data.reqId == this.reqId

            % Select whether Bid or Ask data
            switch e.event.data.side
                case  0  % Ask

                    % Select which operation to use
                    switch e.event.data.operation
                        case {0, 1}   % 0 - Add or 1 - Update a record

                            % Check for a new current Ask price
                            if e.event.data.position == 0  % New current Ask price

                                % Only update price if changed
                                if e.event.data.price == MY.Data(5)
                                    % Do nothing for now 
                                else
                                    % Put the price data into MY.Data
                                    MY.Data(5) = e.event.data.price;
                                end
                            end
                    end

                case  1  % Bid

                    % Select which operation to use
                    switch e.event.data.operation
                        case {0, 1}   % 0 - Add or 1 - Update a record

                            % Check for a new current Ask price
                            if e.event.data.position == 0  % New current Bid price

                                if e.event.data.price == MY.Data(1)
                                    % Do nothing for now 
                                else
                                    % Put the data into MY.Data
                                    MY.Data(1) = e.event.data.price;
                                end
                            end
                    end

                otherwise
                    % This is an error
                    disp('Invalid e.event.data.side from IB');
            end

        end
    end  

end`

Relevant code from class 'timeSales' classdef timeSales() %timeSales - Stream the Time & Sales data % All methods required to manage the Time & Sales data stream, (TICKBYTICK_BIDASK) from Interactive Brokers (IB)

properties
    tickType = "BidAsk";    %
    numberOfTicks = 0;      % 
    ignoreSize = 1;         % Bool 'true'

    dsHandle
    reqId
end

methods
    function this = timeSales()
        % Construct an instance of this class
    end

    function this = startTS(this, session, contract, MY)
        % Set the reqId to only process this request data
        this.reqId = 2;       % For testing only to identify the data stream
        % this.reqId = randi(99999);

        % Attach the listener
        this.dsHandle = event.listener(             ...
            TWS.Events.getInstance                 ,...
            TWS.Events.TICKBYTICK_BIDASK           ,...
            @(s,e)this.processTS(s,e,MY)            ...
        );

        % Start the Time & Sales data stream
        session.eClientSocket.reqTickByTickData(   ...
            this.reqId                            ,...
            contract                              ,...
            this.tickType                         ,...
            this.numberOfTicks                    ,...
            this.ignoreSize                        ...
        );
    end

    function stopTS(this, session)
        % Stop the data stream
        session.eClientSocket.cancelTickByTickData(2);    % For testing only
        % session.eClientSocket.cancelTickByTickData(this.reqId);
    end

    function processTS(this,s,e, MY)
        % Pre-process the transaction quantities from the Time and Sales
        % data stream

        % Make sure data is for current reqId
        if e.event.data.reqId == this.reqId

            % Calculate the cumulative Ask size per price level
            if e.event.data.askPrice == MY.Data(5)
                MY.Data(10) = MY.Data(10) + e.event.data.askSize;
            else
                MY.Data(10) = e.event.data.askSize;
            end

            % Calculate the cumulative Bid size per price level
            if e.event.data.bidPrice == MY.Data(1)
                MY.Data(9) = MY.Data(9) + e.event.data.bidSize;
            else
                MY.Data(9) = e.event.data.bidSize;
            end

        end
    end
end

end`

Relevant code from class 'MyEnvironment'

classdef MyEnvironment `%MYENVIRONMENT: custom environment in MATLAB

properties

    % Input data from (IB) data streams
    Data = zeros(12,1);

end

%% Necessary Methods
methods
    % Contructor method creates an instance of the environment
    function this = MyEnvironment()

    end
end

%% Optional Methods (set methods' attributes accordingly)
methods

    function this = set.Data(this, value)
        validateattributes(value,{'double'},{'finite','real', 'vector'},'','Data');
        this.Data = value;
    end

    function result = get.Data(this)
        result = this.Data;
    end

end     

end`

softwarespartan commented 3 years ago

Thanks for reaching out! It is possible to subscribe to many events simultaneously on single thread. Just define the associated call back for each event. It’s not clear that parfeval is needed. Let me know if that helps. I can look into it more tomorrow/this week

LimitlessTrader commented 3 years ago

Thanks for the prompt response.

Yes I have tested subscribing multiple events on a single thread, (and it works well). My goal is to move the data streams to a "background" thread to leave the primary thread for function Main() to train Deep Learning algorithms on the streamed data.

I wonder if I can push the Neural Network training to background threads as well? I'd really like to keep the primary thread clear for real time control of order management.

Thanks

softwarespartan commented 3 years ago

I see, as far as I know the events are processed on background “event dispatch thread” or EDT.

https://undocumentedmatlab.com/articles/matlab-and-the-event-dispatch-thread-edt

You might be able to use EDT but either way at some point you’ll need to block on main Matlab thread. I would suggest if you need strong multi thread support to compile out the Matlab algorithm to Java and then use it in Java API.

Certainly, TWS events will queue in IB4m. So you wouldn’t ever miss an event but could get latency/lag if have long event processing.

LimitlessTrader commented 3 years ago

I'm not expert enough in Java for that. I bought the Parallel Computing Toolbox in MATLAB to take care of that for me, hence the attempted use of parfeval and batch.

I raised the issue, because the warning messages were indicating that the parallel threads were expecting something, I believe within IB4M to be 'serializable'. I don't know technically what that means, where the problem is or how to fix it.

I'll continue testing and looking for ways to resolve the problem, but I wanted to report it so it can be investigated and if necessary rectified or improved for everyone. I do the same with MATLAB products.

LimitlessTrader commented 3 years ago

My apologies, I should have included my software environment in the issue:

IB4M ver 1.2.1.0 Installed 10 December 2020

>> ver
-----------------------------------------------------------------------------------------------------
MATLAB Version: 9.10.0.1684407 (R2021a) Update 3
Operating System: Microsoft Windows 10 Home Single Language Version 10.0 (Build 19043)
Java Version: Java 1.8.0_202-b08 with Oracle Corporation Java HotSpot(TM) 64-Bit Server VM mixed mode
-----------------------------------------------------------------------------------------------------
MATLAB                                                Version 9.10        (R2021a)
Curve Fitting Toolbox                                 Version 3.5.13      (R2021a)
DSP System Toolbox                                    Version 9.12        (R2021a)
Database Toolbox                                      Version 10.1        (R2021a)
Datafeed Toolbox                                      Version 6.0         (R2021a)
Deep Learning Toolbox                                 Version 14.2        (R2021a)
Econometrics Toolbox                                  Version 5.6         (R2021a)
Financial Instruments Toolbox                         Version 3.2         (R2021a)
Financial Toolbox                                     Version 6.1         (R2021a)
Global Optimization Toolbox                           Version 4.5         (R2021a)
MATLAB Report Generator                               Version 5.10        (R2021a)
Optimization Toolbox                                  Version 9.1         (R2021a)
Parallel Computing Toolbox                            Version 7.4         (R2021a)
Reinforcement Learning Toolbox                        Version 2.0         (R2021a)
Signal Processing Toolbox                             Version 8.6         (R2021a)
Statistics and Machine Learning Toolbox               Version 12.1        (R2021a)

IB TWS
`Trader Workstation 
Copyright © 2000-2021 LICENSOR to Interactive Brokers 
ALL RIGHTS RESERVED  

Build 985.1d, Jun 23, 2021 4:48:46 PM 
Jolt Build 1.17.11, May 7, 2021 10:25:19 AM 
Nia Build 2.22.8, May 27, 2021 04:55:16 PM 
ModelNav Build 1.11.3, Oct 20, 2020 05:52:13 PM 
Riskfeed Build 2.42.17, May 27, 2021 04:38:09 PM  

Java Version: 1.8.0_152, OS: Windows 10 (amd64, 10.0), 
Memory: heap max 768Mb, current 125Mb`