darwinex / dwx-zeromq-connector

Wrapper library for algorithmic trading in Python 3, providing DMA/STP access to Darwinex liquidity via a ZeroMQ-enabled MetaTrader Bridge EA.
https://blog.darwinex.com/zeromq-interface-python-r-metatrader4/
BSD 3-Clause "New" or "Revised" License
346 stars 229 forks source link

Cannot subscribe to two symbols using DWX_ZeroMQ_Connector_v2_0_1_RC8.py #82

Closed suchendra-h closed 4 years ago

suchendra-h commented 4 years ago

Hello,

When I try to subscribe to two symbols at the same time, it will only choose one of them and stores in Market_DB variable.

The chosen symbol is the one which appears first in Publish_Symbols array in EA. For example EURUSD has precedence over all other symbols if we consider default values for Publish_Symbols.

If you want to test it out you can copy the following code after the contents of _DWX_ZeroMQ_Connector_v2_0_1RC8.py

##############################################################################
# Contents of DWX_ZeroMQ_Connector_v2_0_1_RC8.py
##############################################################################
from time import sleep

if __name__ == '__main__':
    connector = DWX_ZeroMQ_Connector()
    connector._DWX_MTX_SUBSCRIBE_MARKETDATA_(_symbol='EURUSD')
    connector._DWX_MTX_SUBSCRIBE_MARKETDATA_(_symbol='USDCHF')
    sleep(5)
    connector._DWX_ZMQ_SHUTDOWN_()

Is this correct or am I doing something wrong?

Thank you

Dwyte commented 4 years ago

Same issue I face, I think it's because of the SUB socket, only single one is used to subscribe on all them. Like the server fires the data lets say EURUSD -> GBPUSD -> USDJPY, client polls recieves EURUSD after finishing itll poll again, but the server is already done with the cycle, so server fires EURUSD -> GBPUSD -> USDJPY again, client polls same time only recieving EURUSD again. Sometimes they may lag a bit and itll recieve GBPUSD, as the instrument gets on the latter part it gets rarer for the data to come to client. I'm figuring out a solution as well.

Dwyte commented 4 years ago

I got some help out from stackoverflow and mq5forum. The module DWX_ZeroMQ_Server_v2.0.1_RC8.mq4 need some changes. Instead of looping the symbols in an order to PUBlish bid/ask data to client SUB, we randomize the publication, this doesn't guarantee to receive all ticks but with this we can have an even distribution of ticks coming in if we have subscribed into multiple symbols. Let's say GBPUSD, EURUSD, AUDUSD we had before 110, 8, 2 as the number of ticks recieve at x time, with this fix we can have 35, 42, 43 they dont equal yes, but on average in the long run it all evens out.

First is adding this method:

double MathRandRange(int x, int x) { 
   return(NormalizeDouble(x+MathMod(MathRand(),MathAbs(x-y)), 0));
} 

Second change on line 140 - OnTick method:

void OnTick()
{
   /*
      Use this OnTick() function to send market data to subscribed client.
   */

   if(CheckServerStatus() == true)
   {
      if(Publish_MarketData == true)
      {
         string symbol = Publish_Symbols[MathRandRange(0, ArraySize(Publish_Symbols))];
         string _tick = GetBidAsk(symbol);
         Print("Sending " + symbol + " " + _tick + " to PUB Socket");
         InformPullClient(pubSocket, StringFormat("%s %s", symbol, _tick));
      }
   }
}

I haven't tested this yet on live market on how good this is, in real trading time. I tested this on the OnTimer method w/c procs more often (1 every milisecond). We shall see once the market opens.

suchendra-h commented 4 years ago

Thanks for your reply. I think there is another possibility too. to send the data for all symbols we want in one long message and then parse it in the python side. I changed the OnTick function of the EA to the following

void OnTick()
{
   /*
      Use this OnTick() function to send market data to subscribed client.
   */

   if(CheckServerStatus() == true)
   {
      if(Publish_MarketData == true)
      {
         string market_data_str = "";
         for(int s = 0; s < ArraySize(Publish_Symbols); s++)
         {
            string _tick = GetBidAsk(Publish_Symbols[s]);
            if(Verbose)
              {
                  Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUB Socket");
              }

            string this_sym_str = StringFormat("%s %s ", Publish_Symbols[s], _tick);
            StringAdd(market_data_str, this_sym_str);
         }
         InformPullClient(pubSocket, market_data_str);
      }
   }
}

this gives us a long string with similar formatting as before with just one extra space character at the end. If we parse this on python side and store it in Market_Data_DB variable I think we are done.

I will post the modified python function too when I finish

suchendra-h commented 4 years ago

So I changed _DWX_ZMQ_PollData function from the "DWX_ZeroMQ_Connector_v2_0_1_RC8.py" file to parse the data sent from EA. It works pretty much as I needed. Here is the complete function

    def _DWX_ZMQ_Poll_Data_(self,
                            string_delimiter=';',
                            poll_timeout=1000):
        while self._ACTIVE:

            sleep(self._sleep_delay)  # poll timeout is in ms, sleep() is s.

            sockets = dict(self._poller.poll(poll_timeout))

            # Process response to commands sent to MetaTrader
            if self._PULL_SOCKET in sockets and sockets[self._PULL_SOCKET] == zmq.POLLIN:

                if self._PULL_SOCKET_STATUS['state'] == True:
                    try:

                        # msg = self._PULL_SOCKET.recv_string(zmq.DONTWAIT)
                        msg = self.remote_recv(self._PULL_SOCKET)
                        self._data_received.clear()
                        logging.info('data received flag is cleared')

                        # If data is returned, store as pandas Series
                        if msg != '' and msg != None:

                            try:
                                _data = eval(msg)

                                self._thread_data_output = _data
                                # logging.info(current_thread().name)
                                logging.info(_data)
                                logging.info('data received flag is set')
                                self._data_received.set()

                                if self._verbose:
                                    print(_data)  # default logic

                            except Exception as ex:
                                _exstr = "Exception Type {0}. Args:\n{1!r}"
                                _msg = _exstr.format(type(ex).__name__, ex.args)
                                print(_msg)

                    except zmq.error.Again:
                        pass  # resource temporarily unavailable, nothing to print
                    except ValueError:
                        pass  # No data returned, passing iteration.
                    except UnboundLocalError:
                        pass  # _symbol may sometimes get referenced before being assigned.

                else:
                    print('\r[KERNEL] NO HANDSHAKE on PULL SOCKET.. Cannot READ data.', end='', flush=True)

            # Receive new market data from MetaTrader
            if self._SUB_SOCKET in sockets and sockets[self._SUB_SOCKET] == zmq.POLLIN:

                try:
                    msg = self._SUB_SOCKET.recv_string(zmq.DONTWAIT)

here is the part I changed

                    if msg != '':
                        msg = msg.split(" ")[:-1]
                        symbols = msg[0::2]     # symbols are in even indexes
                        prices = msg[1::2]      # and prices in odd ones
                        if len(prices) == len(symbols):
                            for symbol, price in zip(symbols, prices):
                                timestamp = str(Timestamp.now())[:-6]
                                bid, ask = price.split(string_delimiter)
                                if self._verbose:
                                    print("\n[" + symbol + "] " + timestamp + " (" + bid + "/" + ask + ") BID/ASK")

                                # Update Market Data DB
                                if symbol not in self.Market_Data_DB.keys():
                                    self.Market_Data_DB[symbol] = {}

                                self.Market_Data_DB[symbol][timestamp] = (float(bid), float(ask))
                except zmq.error.Again:
                    pass  # resource temporarily unavailable, nothing to print
                except ValueError:
                    pass  # No data returned, passing iteration.
                except UnboundLocalError:
                    pass  # _symbol may sometimes get referenced before being assigned.

        print("\n++ [KERNEL] _DWX_ZMQ_Poll_Data_() Signing Out ++")

I will try to fork their repo include this fix and add a few other features. I will close this issue, thanks again for your contribution @Dwyte