darwinex / dwx-zeromq-connector

Wrapper library for algorithmic trading in Python 3, providing DMA/STP access to Darwinex liquidity via a ZeroMQ-enabled MetaTrader Bridge EA.
https://blog.darwinex.com/zeromq-interface-python-r-metatrader4/
BSD 3-Clause "New" or "Revised" License
346 stars 229 forks source link

New Functions and Bug Fixes #79

Closed Wortburrito closed 4 years ago

Wortburrito commented 4 years ago

Hello, i added 3 new Functions: _DWX_MTX_GET_ACCOUNT_INFO_() --> Informations like balance, margin, etc _DWX_MTX_GET_MARKET_INFO_([symbolpair]) ---> precision, digits, swap ... and a new SUB Socket for receiving Account Infos + Open Orders every 100ms in JSON and write it to self._ACC_Data_DB (i didnt test the new SUB socket jet)
Also resolved the 'GET OPEN TRADES String Limitation Problem' by appending the strings with String Concatenation. I retrieved all of my 100 test Orders (100 is the maximum by my broker) And last but not least all outputs are 100% JSON dictionarys.

Here my Connector.py:

# -*- coding: utf-8 -*-
"""
    DWX_ZeroMQ_Connector_v2_0_1_RC8.py
    --
    @author: Darwinex Labs (www.darwinex.com)

    Last Updated: August 06, 2019

    Copyright (c) 2017-2019, Darwinex. All rights reserved.

    Licensed under the BSD 3-Clause License, you may not use this file except
    in compliance with the License.

    You may obtain a copy of the License at:
    https://opensource.org/licenses/BSD-3-Clause
"""

import zmq
from time import sleep
from pandas import DataFrame, Timestamp
from threading import Thread
import ast

# 30-07-2019 10:58 CEST
from zmq.utils.monitor import recv_monitor_message

class DWX_ZeroMQ_Connector():

    """
    Setup ZeroMQ -> MetaTrader Connector
    """
    def __init__(self,
                 _ClientID='dwx-zeromq',    # Unique ID for this client
                 _host='127.0.0.1',         # Host to connect to
                 _protocol='tcp',           # Connection protocol
                 _PUSH_PORT=1000,          # Port for Sending commands
                 _PULL_PORT=1001,          # Port for Receiving responses
                 _SUB_PORT=1002,           # Port for Subscribing for prices
                 _SUB_PORT_ACC=1003,          # Port for Subscribing Accound and OpenOrders
                 _delimiter=';',
                 _verbose=True,             # String delimiter
                 _poll_timeout=1000,        # ZMQ Poller Timeout (ms)
                 _sleep_delay=0.001,        # 1 ms for time.sleep()
                 _monitor=False):           # Experimental ZeroMQ Socket Monitoring

        ######################################################################

        # Strategy Status (if this is False, ZeroMQ will not listen for data)
        self._ACTIVE = True

        # Client ID
        self._ClientID = _ClientID

        # ZeroMQ Host
        self._host = _host

        # Connection Protocol
        self._protocol = _protocol

        # ZeroMQ Context
        self._ZMQ_CONTEXT = zmq.Context()

        # TCP Connection URL Template
        self._URL = self._protocol + "://" + self._host + ":"

        # Ports for PUSH, PULL and SUB sockets respectively
        self._PUSH_PORT = _PUSH_PORT
        self._PULL_PORT = _PULL_PORT
        self._SUB_PORT = _SUB_PORT
        self._SUB_PORT_ACC = _SUB_PORT_ACC

        # Create Sockets
        self._PUSH_SOCKET = self._ZMQ_CONTEXT.socket(zmq.PUSH)
        self._PUSH_SOCKET.setsockopt(zmq.SNDHWM, 1)
        self._PUSH_SOCKET_STATUS = {'state': True, 'latest_event': 'N/A'}

        self._PULL_SOCKET = self._ZMQ_CONTEXT.socket(zmq.PULL)
        self._PULL_SOCKET.setsockopt(zmq.RCVHWM, 1)
        self._PULL_SOCKET_STATUS = {'state': True, 'latest_event': 'N/A'}

        self._SUB_SOCKET = self._ZMQ_CONTEXT.socket(zmq.SUB)
        self._SUB_SOCKET_ACC = self._ZMQ_CONTEXT.socket(zmq.SUB)

        # Bind PUSH Socket to send commands to MetaTrader
        self._PUSH_SOCKET.connect(self._URL + str(self._PUSH_PORT))
        print("[INIT] Ready to send commands to METATRADER (PUSH): " + str(self._PUSH_PORT))

        # Connect PULL Socket to receive command responses from MetaTrader
        self._PULL_SOCKET.connect(self._URL + str(self._PULL_PORT))
        print("[INIT] Listening for responses from METATRADER (PULL): " + str(self._PULL_PORT))

        # Connect SUB Socket to receive market data from MetaTrader
        print("[INIT] Listening for market data from METATRADER (SUB): " + str(self._SUB_PORT))
        self._SUB_SOCKET.connect(self._URL + str(self._SUB_PORT))

        # Connect SUB Socket to receive Account Data from MetaTrader
        print("[INIT] Listening for Account Data from METATRADER (SUB): " + str(self._SUB_PORT_ACC))
        self._SUB_SOCKET_ACC.connect(self._URL + str(self._SUB_PORT_ACC))

        # Initialize POLL set and register PULL and SUB sockets
        self._poller = zmq.Poller()
        self._poller.register(self._PULL_SOCKET, zmq.POLLIN)
        self._poller.register(self._SUB_SOCKET, zmq.POLLIN)
        self._poller.register(self._SUB_SOCKET_ACC, zmq.POLLIN)

        # Start listening for responses to commands and new market data
        self._string_delimiter = _delimiter

        # BID/ASK Market Data Subscription Threads ({SYMBOL: Thread})
        self._MarketData_Thread = None

        # BID/ASK Market Data Subscription Threads ({SYMBOL: Thread})
        self._ACC_Thread = None

        # Socket Monitor Threads
        self._PUSH_Monitor_Thread = None
        self._PULL_Monitor_Thread = None

        # Market Data Dictionary by Symbol (holds tick data)
        self._Market_Data_DB = {}   # {SYMBOL: {TIMESTAMP: (BID, ASK)}}

        # Account and OpenTrades Dictionary
        self._Market_Data_DB = {}   # {SYMBOL: {TIMESTAMP: (BID, ASK)}}

        # Temporary Order STRUCT for convenience wrappers later.
        self.temp_order_dict = self._generate_default_order_dict()

        # Thread returns the most recently received DATA block here
        self._thread_data_output = None

        # Verbosity
        self._verbose = _verbose

        # ZMQ Poller Timeout
        self._poll_timeout = _poll_timeout

        # Global Sleep Delay
        self._sleep_delay = _sleep_delay

        # Begin polling for PULL / SUB data
        self._MarketData_Thread = Thread(target=self._DWX_ZMQ_Poll_Data_,
                                         args=(self._string_delimiter,
                                               self._poll_timeout,))
        self._MarketData_Thread.daemon = True
        self._MarketData_Thread.start()

        # Begin polling for PULL / SUB data
        self._ACC_Thread = Thread(target=self._DWX_ZMQ_Poll_Data_,
                                         args=(self._string_delimiter,
                                               self._poll_timeout,))
        self._ACC_Thread.daemon = True
        self._ACC_Thread.start()

        ###########################################
        # Enable/Disable ZeroMQ Socket Monitoring #
        ###########################################
        if _monitor == True:

            # ZeroMQ Monitor Event Map
            self._MONITOR_EVENT_MAP = {}

            print("\n[KERNEL] Retrieving ZeroMQ Monitor Event Names:\n")

            for name in dir(zmq):
                if name.startswith('EVENT_'):
                    value = getattr(zmq, name)
                    print(f"{value}\t\t:\t{name}")
                    self._MONITOR_EVENT_MAP[value] = name

            print("\n[KERNEL] Socket Monitoring Config -> DONE!\n")

            # Disable PUSH/PULL sockets and let MONITOR events control them.
            self._PUSH_SOCKET_STATUS['state'] = False
            self._PULL_SOCKET_STATUS['state'] = False

            # PUSH
            self._PUSH_Monitor_Thread = Thread(target=self._DWX_ZMQ_EVENT_MONITOR_,
                                               args=("PUSH",
                                                     self._PUSH_SOCKET.get_monitor_socket(),))

            self._PUSH_Monitor_Thread.daemon = True
            self._PUSH_Monitor_Thread.start()

            # PULL
            self._PULL_Monitor_Thread = Thread(target=self._DWX_ZMQ_EVENT_MONITOR_,
                                               args=("PULL",
                                                     self._PULL_SOCKET.get_monitor_socket(),))

            self._PULL_Monitor_Thread.daemon = True
            self._PULL_Monitor_Thread.start()

    ##########################################################################

    def _DWX_ZMQ_SHUTDOWN_(self):

        # Set INACTIVE
        self._ACTIVE = False

        # Get all threads to shutdown
        if self._MarketData_Thread is not None:
            self._MarketData_Thread.join()

        if self._ACC_Thread is not None:
            self._ACC_Thread.join()

        if self._PUSH_Monitor_Thread is not None:
            self._PUSH_Monitor_Thread.join()

        if self._PULL_Monitor_Thread is not None:
            self._PULL_Monitor_Thread.join()

        # Unregister sockets from Poller
        self._poller.unregister(self._PULL_SOCKET)
        self._poller.unregister(self._SUB_SOCKET)
        self._poller.unregister(self._SUB_SOCKET_ACC)
        print("\n++ [KERNEL] Sockets unregistered from ZMQ Poller()! ++")

        # Terminate context
        self._ZMQ_CONTEXT.destroy(0)
        print("\n++ [KERNEL] ZeroMQ Context Terminated.. shut down safely complete! :)")

    ##########################################################################

    """
    Set Status (to enable/disable strategy manually)
    """
    def _setStatus(self, _new_status=False):

        self._ACTIVE = _new_status
        print("\n**\n[KERNEL] Setting Status to {} - Deactivating Threads.. please wait a bit.\n**".format(_new_status))

    ##########################################################################

    """
    Function to send commands to MetaTrader (PUSH)
    """
    def remote_send(self, _socket, _data):

        if self._PUSH_SOCKET_STATUS['state'] == True:
            try:
                _socket.send_string(_data, zmq.DONTWAIT)
            except zmq.error.Again:
                print("\nResource timeout.. please try again.")
                sleep(self._sleep_delay)
        else:
            print('\n[KERNEL] NO HANDSHAKE ON PUSH SOCKET.. Cannot SEND data')

    ##########################################################################

    def _get_response_(self):
        return self._thread_data_output

    ##########################################################################

    def _set_response_(self, _resp=None):
        self._thread_data_output = _resp

    ##########################################################################

    def _valid_response_(self, _input='zmq'):

        # Valid data types
        _types = (dict,DataFrame)

        # If _input = 'zmq', assume self._zmq._thread_data_output
        if isinstance(_input, str) and _input == 'zmq':
            return isinstance(self._get_response_(), _types)
        else:
            return isinstance(_input, _types)

        # Default
        return False

    ##########################################################################

    """
    Function to retrieve data from MetaTrader (PULL)
    """
    def remote_recv(self, _socket):

        if self._PULL_SOCKET_STATUS['state'] == True:
            try:
                msg = _socket.recv_string(zmq.DONTWAIT)
                return msg
            except zmq.error.Again:
                print("\nResource timeout.. please try again.")
                sleep(self._sleep_delay)
        else:
            print('\r[KERNEL] NO HANDSHAKE ON PULL SOCKET.. Cannot READ data', end='', flush=True)

        return None

    ##########################################################################

    # Convenience functions to permit easy trading via underlying functions.

    # OPEN ORDER
    def _DWX_MTX_NEW_TRADE_(self, _order=None):

        if _order is None:
            _order = self._generate_default_order_dict()

        # Execute
        self._DWX_MTX_SEND_COMMAND_(**_order)

    # MODIFY ORDER
    def _DWX_MTX_MODIFY_TRADE_BY_TICKET_(self, _ticket, _SL, _TP): # in points

        try:
            self.temp_order_dict['_action'] = 'MODIFY'
            self.temp_order_dict['_SL'] = _SL
            self.temp_order_dict['_TP'] = _TP
            self.temp_order_dict['_ticket'] = _ticket

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            print("[ERROR] Order Ticket {} not found!".format(_ticket))

    # CLOSE ORDER
    def _DWX_MTX_CLOSE_TRADE_BY_TICKET_(self, _ticket):

        try:
            self.temp_order_dict['_action'] = 'CLOSE'
            self.temp_order_dict['_ticket'] = _ticket

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            print("[ERROR] Order Ticket {} not found!".format(_ticket))

    # CLOSE PARTIAL
    def _DWX_MTX_CLOSE_PARTIAL_BY_TICKET_(self, _ticket, _lots):

        try:
            self.temp_order_dict['_action'] = 'CLOSE_PARTIAL'
            self.temp_order_dict['_ticket'] = _ticket
            self.temp_order_dict['_lots'] = _lots

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            print("[ERROR] Order Ticket {} not found!".format(_ticket))

    # CLOSE MAGIC
    def _DWX_MTX_CLOSE_TRADES_BY_MAGIC_(self, _magic):

        try:
            self.temp_order_dict['_action'] = 'CLOSE_MAGIC'
            self.temp_order_dict['_magic'] = _magic

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            pass

    # CLOSE ALL TRADES
    def _DWX_MTX_CLOSE_ALL_TRADES_(self):

        try:
            self.temp_order_dict['_action'] = 'CLOSE_ALL'

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            pass

    # GET OPEN TRADES
    def _DWX_MTX_GET_ALL_OPEN_TRADES_(self):

        try:
            self.temp_order_dict['_action'] = 'GET_OPEN_TRADES'

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            pass

    # GET ACCOUNT DATA
    def _DWX_MTX_GET_ACCOUNT_INFO_(self):

        try:
            self.temp_order_dict['_action'] = 'GET_ACCOUNT_INFO'

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            pass
    # GET ACCOUNT DATA
    def _DWX_MTX_GET_MARKET_INFO_(self,which):

        try:
            self.temp_order_dict['_action'] = 'GET_MARKETINFO'
            self.temp_order_dict['_symbol'] = which

            # Execute
            self._DWX_MTX_SEND_COMMAND_(**self.temp_order_dict)

        except KeyError:
            pass

    # DEFAULT ORDER DICT
    def _generate_default_order_dict(self):
        return({'_action': 'OPEN',
                  '_type': 0,
                  '_symbol': 'EURUSD',
                  '_price': 0.0,
                  '_SL': 500, # SL/TP in POINTS, not pips.
                  '_TP': 500,
                  '_comment': self._ClientID,
                  '_lots': 0.01,
                  '_magic': 123456,
                  '_ticket': 0})

    # DEFAULT DATA REQUEST DICT
    def _generate_default_data_dict(self):
        return({'_action': 'DATA',
                  '_symbol': 'EURUSD',
                  '_timeframe': 1440, # M1 = 1, M5 = 5, and so on..
                  '_start': '2018.12.21 17:00:00', # timestamp in MT4 recognized format
                  '_end': '2018.12.21 17:05:00'})

    ##########################################################################
    """
    Function to construct messages for sending DATA commands to MetaTrader
    """
    def _DWX_MTX_SEND_MARKETDATA_REQUEST_(self,
                                 _symbol='EURUSD',
                                 _timeframe=1,
                                 _start='2019.01.04 17:00:00',
                                 _end=Timestamp.now().strftime('%Y.%m.%d %H:%M:00')):
                                 #_end='2019.01.04 17:05:00'):

        _msg = "{};{};{};{};{}".format('DATA',
                                     _symbol,
                                     _timeframe,
                                     _start,
                                     _end)
        # Send via PUSH Socket
        self.remote_send(self._PUSH_SOCKET, _msg)

    ##########################################################################
    """
    Function to construct messages for sending Trade commands to MetaTrader
    """
    def _DWX_MTX_SEND_COMMAND_(self, _action='OPEN', _type=0,
                                 _symbol='EURUSD', _price=0.0,
                                 _SL=50, _TP=50, _comment="Python-to-MT",
                                 _lots=0.01, _magic=123456, _ticket=0):

        _msg = "{};{};{};{};{};{};{};{};{};{};{}".format('TRADE',_action,_type,
                                                         _symbol,_price,
                                                         _SL,_TP,_comment,
                                                         _lots,_magic,
                                                         _ticket)

        # Send via PUSH Socket
        self.remote_send(self._PUSH_SOCKET, _msg)

        """
         compArray[0] = TRADE or DATA
         compArray[1] = ACTION (e.g. OPEN, MODIFY, CLOSE)
         compArray[2] = TYPE (e.g. OP_BUY, OP_SELL, etc - only used when ACTION=OPEN)

         For compArray[0] == DATA, format is:
             DATA|SYMBOL|TIMEFRAME|START_DATETIME|END_DATETIME

         // ORDER TYPES:
         // https://docs.mql4.com/constants/tradingconstants/orderproperties

         // OP_BUY = 0
         // OP_SELL = 1
         // OP_BUYLIMIT = 2
         // OP_SELLLIMIT = 3
         // OP_BUYSTOP = 4
         // OP_SELLSTOP = 5

         compArray[3] = Symbol (e.g. EURUSD, etc.)
         compArray[4] = Open/Close Price (ignored if ACTION = MODIFY)
         compArray[5] = SL
         compArray[6] = TP
         compArray[7] = Trade Comment
         compArray[8] = Lots
         compArray[9] = Magic Number
         compArray[10] = Ticket Number (MODIFY/CLOSE)
         """
        # pass

    ##########################################################################

    """
    Function to check Poller for new reponses (PULL) and market data (SUB)
    """

    def _DWX_ZMQ_Poll_Data_(self,
                           string_delimiter=';',
                           poll_timeout=1000):

        while self._ACTIVE:

            sleep(self._sleep_delay) # poll timeout is in ms, sleep() is s.

            sockets = dict(self._poller.poll(poll_timeout))

            # Process response to commands sent to MetaTrader
            if self._PULL_SOCKET in sockets and sockets[self._PULL_SOCKET] == zmq.POLLIN:

                if self._PULL_SOCKET_STATUS['state'] == True:
                    try:

                        # msg = self._PULL_SOCKET.recv_string(zmq.DONTWAIT)
                        msg = self.remote_recv(self._PULL_SOCKET)

                        # If data is returned, store as pandas Series
                        if msg != '' and msg != None:

                            try:
                                _data = eval(msg)

                                self._thread_data_output = _data
                                if self._verbose:
                                    print(_data) # default logic

                            except Exception as ex:
                                _exstr = "Exception Type {0}. Args:\n{1!r}"
                                _msg = _exstr.format(type(ex).__name__, ex.args)
                                print(_msg)

                    except zmq.error.Again:
                        pass # resource temporarily unavailable, nothing to print
                    except ValueError:
                        pass # No data returned, passing iteration.
                    except UnboundLocalError:
                        pass # _symbol may sometimes get referenced before being assigned.

                else:
                    print('\r[KERNEL] NO HANDSHAKE on PULL SOCKET.. Cannot READ data.', end='', flush=True)

            # Receive new market data from MetaTrader
            if self._SUB_SOCKET in sockets and sockets[self._SUB_SOCKET] == zmq.POLLIN:

                try:
                    msg = self._SUB_SOCKET.recv_string(zmq.DONTWAIT)

                    if msg != "":
                        _symbol, _data = msg.split(" ")
                        _bid, _ask = _data.split(string_delimiter)
                        _timestamp = str(Timestamp.now('UTC'))[:-6]

                        if self._verbose:
                            print("\n[" + _symbol + "] " + _timestamp + " (" + _bid + "/" + _ask + ") BID/ASK")

                        # Update Market Data DB
                        if _symbol not in self._Market_Data_DB.keys():
                            self._Market_Data_DB[_symbol] = {}

                        self._Market_Data_DB[_symbol][_timestamp] = (float(_bid), float(_ask))

                except zmq.error.Again:
                    pass # resource temporarily unavailable, nothing to print
                except ValueError:
                    pass # No data returned, passing iteration.
                except UnboundLocalError:
                    pass # _symbol may sometimes get referenced before being assigned.

            if self._SUB_SOCKET_ACC in sockets and sockets[self._SUB_SOCKET_ACC] == zmq.POLLIN:

                try:
                    msg = self._SUB_SOCKET_ACC.recv_string(zmq.DONTWAIT)

                    if msg != "":

                        if self._verbose:
                            print(msg)

                        # Update Account Data DB
                        msg = ast.literal_eval(msg)

                        self._ACC_Data_DB = msg

                except zmq.error.Again:
                    pass # resource temporarily unavailable, nothing to print
                except ValueError:
                    pass # No data returned, passing iteration.
                except UnboundLocalError:
                    pass # _symbol may sometimes get referenced before being assigned.

        print("\n++ [KERNEL] _DWX_ZMQ_Poll_Data_() Signing Out ++")

    ##########################################################################

    """
    Function to subscribe to given Symbol's BID/ASK feed from MetaTrader
    """
    def _DWX_MTX_SUBSCRIBE_MARKETDATA_(self,
                                       _symbol='EURUSD',
                                       string_delimiter=';',
                                       poll_timeout=1000):

        # Subscribe to SYMBOL first.
        self._SUB_SOCKET.setsockopt_string(zmq.SUBSCRIBE, _symbol)

        print("[KERNEL] Subscribed to {} BID/ASK updates. See self._Market_Data_DB.".format(_symbol))

    """
    Function to subscribe to Account Data
    """
    def _DWX_MTX_SUBSCRIBE_ACCOUNT_(self, poll_timeout=1000):

        # Subscribe to SYMBOL first.
        self._SUB_SOCKET_ACC.setsockopt_string(zmq.SUBSCRIBE, '')

        print("[KERNEL] Subscribed to ACCOUNT updates. See self._ACC_Data_DB.")

    """
    Function to unsubscribe to given Symbol's BID/ASK feed from MetaTrader
    """
    def _DWX_MTX_UNSUBSCRIBE_MARKETDATA_(self, _symbol):

        self._SUB_SOCKET.setsockopt_string(zmq.UNSUBSCRIBE, _symbol)
        print("\n**\n[KERNEL] Unsubscribing from " + _symbol + "\n**\n")

    """
    Function to unsubscribe to Account Data
    """
    def _DWX_MTX_UNSUBSCRIBE_ACCOUNT_(self, _symbol):

        self._SUB_SOCKET_ACC.setsockopt_string(zmq.UNSUBSCRIBE, _symbol)
        print("\n**\n[KERNEL] Unsubscribing from Account Data\n**\n")

    """
    Function to unsubscribe from ALL MetaTrader Symbols
    """
    def _DWX_MTX_UNSUBSCRIBE_ALL_MARKETDATA_REQUESTS_(self):

        # 31-07-2019 12:22 CEST
        for _symbol in self._Market_Data_DB.keys():
            self._DWX_MTX_UNSUBSCRIBE_MARKETDATA_(_symbol=_symbol)

    ##########################################################################

    def _DWX_ZMQ_EVENT_MONITOR_(self,
                                socket_name,
                                monitor_socket):

        # 05-08-2019 11:21 CEST
        while self._ACTIVE:

            sleep(self._sleep_delay) # poll timeout is in ms, sleep() is s.

            # while monitor_socket.poll():
            while monitor_socket.poll(self._poll_timeout):

                try:
                    evt = recv_monitor_message(monitor_socket, zmq.DONTWAIT)
                    evt.update({'description': self._MONITOR_EVENT_MAP[evt['event']]})

                    # print(f"\r[{socket_name} Socket] >> {evt['description']}", end='', flush=True)
                    print(f"\n[{socket_name} Socket] >> {evt['description']}")

                    # Set socket status on HANDSHAKE
                    if evt['event'] == 4096:        # EVENT_HANDSHAKE_SUCCEEDED

                        if socket_name == "PUSH":
                            self._PUSH_SOCKET_STATUS['state'] = True
                            self._PUSH_SOCKET_STATUS['latest_event'] = 'EVENT_HANDSHAKE_SUCCEEDED'

                        elif socket_name == "PULL":
                            self._PULL_SOCKET_STATUS['state'] = True
                            self._PULL_SOCKET_STATUS['latest_event'] = 'EVENT_HANDSHAKE_SUCCEEDED'

                        # print(f"\n[{socket_name} Socket] >> ..ready for action!\n")

                    else:
                        # Update 'latest_event'
                        if socket_name == "PUSH":
                            self._PUSH_SOCKET_STATUS['state'] = False
                            self._PUSH_SOCKET_STATUS['latest_event'] = evt['description']

                        elif socket_name == "PULL":
                            self._PULL_SOCKET_STATUS['state'] = False
                            self._PULL_SOCKET_STATUS['latest_event'] = evt['description']

                    if evt['event'] == zmq.EVENT_MONITOR_STOPPED:

                        # Reinitialize the socket
                        if socket_name == "PUSH":
                            monitor_socket = self._PUSH_SOCKET.get_monitor_socket()
                        elif socket_name == "PULL":
                            monitor_socket = self._PULL_SOCKET.get_monitor_socket()

                except Exception as ex:
                    _exstr = "Exception Type {0}. Args:\n{1!r}"
                    _msg = _exstr.format(type(ex).__name__, ex.args)
                    print(_msg)

        # Close Monitor Socket
        monitor_socket.close()

        print(f"\n++ [KERNEL] {socket_name} _DWX_ZMQ_EVENT_MONITOR_() Signing Out ++")

    ##########################################################################

    def _DWX_ZMQ_HEARTBEAT_(self):
        self.remote_send(self._PUSH_SOCKET, "HEARTBEAT;")

    ##########################################################################

##############################################################################

def _DWX_ZMQ_CLEANUP_(_name='DWX_ZeroMQ_Connector',
                      _globals=globals(),
                      _locals=locals()):

    print('\n++ [KERNEL] Initializing ZeroMQ Cleanup.. if nothing appears below, no cleanup is necessary, otherwise please wait..')
    try:
        _class = _globals[_name]
        _locals = list(_locals.items())

        for _func, _instance in _locals:
            if isinstance(_instance, _class):
                print(f'\n++ [KERNEL] Found & Destroying {_func} object before __init__()')
                eval(_func)._DWX_ZMQ_SHUTDOWN_()
                print('\n++ [KERNEL] Cleanup Complete -> OK to initialize DWX_ZeroMQ_Connector if NETSTAT diagnostics == True. ++\n')

    except Exception as ex:

        _exstr = "Exception Type {0}. Args:\n{1!r}"
        _msg = _exstr.format(type(ex).__name__, ex.args)

        if 'KeyError' in _msg:
            print('\n++ [KERNEL] Cleanup Complete -> OK to initialize DWX_ZeroMQ_Connector. ++\n')
        else:
            print(_msg)

##############################################################################

AND the server.mql:

//+--------------------------------------------------------------+

//|     DWX_ZeroMQ_Server_v2.0.1_RC8.mq4
//|     @author: Darwinex Labs (www.darwinex.com)
//|
//|     Last Updated: September 14, 2019
//|
//|     Copyright (c) 2017-2019, Darwinex. All rights reserved.
//|
//|     Licensed under the BSD 3-Clause License, you may not use this file except
//|     in compliance with the License.
//|
//|     You may obtain a copy of the License at:
//|     https://opensource.org/licenses/BSD-3-Clause
//+--------------------------------------------------------------+
#property copyright "Copyright 2017-2019, Darwinex Labs."
#property link      "https://www.darwinex.com/"
#property version   "2.0.1"
#property strict

// Required: MQL-ZMQ from https://github.com/dingmaotu/mql-zmq

#include <Zmq/Zmq.mqh>

extern string PROJECT_NAME = "DWX_ZeroMQ_MT4_Server";
extern string ZEROMQ_PROTOCOL = "tcp";
extern string HOSTNAME = "127.0.0.1";
extern int PUSH_PORT = 1000;
extern int PULL_PORT = 1001;
extern int PUB_PORT = 1002;
extern int PUB_ACCOUNT_ORDERS = 1003;
extern int MILLISECOND_TIMER = 1;

extern string t0 = "--- Trading Parameters ---";
extern int MagicNumber = 123456;
extern int MaximumOrders = 1;
extern double MaximumLotSize = 0.01;
extern int MaximumSlippage = 3;
extern bool DMA_MODE = true;

extern string t1 = "--- ZeroMQ Configuration ---";
extern bool Publish_MarketData = false;
extern bool Publish_OpenOrders = false;

string Publish_Symbols[1] =
  {
   "EURUSD"
  };

/*
string Publish_Symbols[28] = {
   "EURUSD","EURGBP","EURAUD","EURNZD","EURJPY","EURCHF","EURCAD",
   "GBPUSD","AUDUSD","NZDUSD","USDJPY","USDCHF","USDCAD","GBPAUD",
   "GBPNZD","GBPJPY","GBPCHF","GBPCAD","AUDJPY","CHFJPY","CADJPY",
   "AUDNZD","AUDCHF","AUDCAD","NZDJPY","NZDCHF","NZDCAD","CADCHF"
};
*/

// CREATE ZeroMQ Context
Context context(PROJECT_NAME);

// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);

// CREATE ZMQ_PULL SOCKET
Socket pullSocket(context, ZMQ_PULL);

// CREATE ZMQ_PUB SOCKET
Socket pubSocket(context, ZMQ_PUB);

// CREATE ZMQ_PUB SOCKET FOR OPEN ORDERS
Socket pubSocketOpenOrders(context, ZMQ_PUB);

// VARIABLES FOR LATER
uchar _data[];
ZmqMsg request;

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+
int OnInit()
  {
//---

   EventSetMillisecondTimer(MILLISECOND_TIMER);     // Set Millisecond Timer to get client socket input

   context.setBlocky(false);

   /* Set Socket Options */

// Send responses to PULL_PORT that client is listening on.
   pushSocket.setSendHighWaterMark(1);
   pushSocket.setLinger(0);
   Print("[PUSH] Binding MT4 Server to Socket on Port " + IntegerToString(PULL_PORT) + "..");
   pushSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT));

// Receive commands from PUSH_PORT that client is sending to.
   pullSocket.setReceiveHighWaterMark(1);
   pullSocket.setLinger(0);
   Print("[PULL] Binding MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pullSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   if(Publish_MarketData == true)
     {
      // Send new market data to PUB_PORT that client is subscribed to.
      pubSocket.setSendHighWaterMark(1);
      pubSocket.setLinger(0);
      Print("[PUB] Binding MT4 Server to Socket on Port " + IntegerToString(PUB_PORT) + "..");
      pubSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT));
     }

   if(Publish_OpenOrders == true)
     {
      // Send open orders data to PUB_ACCOUNT_ORDERS that client is subscribed to.
      pubSocketOpenOrders.setSendHighWaterMark(1);
      pubSocketOpenOrders.setLinger(0);
      Print("[PUB] Binding MT4 Server to Socket on Port " + IntegerToString(PUB_ACCOUNT_ORDERS) + "..");
      pubSocketOpenOrders.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_ACCOUNT_ORDERS));
     }
//---
   return(INIT_SUCCEEDED);
  }
//+------------------------------------------------------------------+
//| Expert deinitialization function                                 |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
  {
//---

   Print("[PUSH] Unbinding MT4 Server from Socket on Port " + IntegerToString(PULL_PORT) + "..");
   pushSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT));
   pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT));

   Print("[PULL] Unbinding MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
   pullSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
   pullSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));

   if(Publish_MarketData == true)
     {
      Print("[PUB] Unbinding MT4 Server from Socket on Port " + IntegerToString(PUB_PORT) + "..");
      pubSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT));
      pubSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT));
     }

   if(Publish_OpenOrders == true)
     {
      Print("[PUB] Unbinding MT4 Server from Socket on Port " + IntegerToString(PUB_ACCOUNT_ORDERS) + "..");
      pubSocketOpenOrders.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_ACCOUNT_ORDERS));
      pubSocketOpenOrders.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_ACCOUNT_ORDERS));
     }
// Destroy ZeroMQ Context
   context.destroy(0);

   EventKillTimer();
  }

//+------------------------------------------------------------------+
//| Expert tick function                                            |
//+------------------------------------------------------------------+
void OnTick()
  {
   /*
      Use this OnTick() function to send market data to subscribed client.
   */

   if(CheckServerStatus() == true)
     {
      if(Publish_MarketData == true)
        {
         for(int s = 0; s < ArraySize(Publish_Symbols); s++)
           {
            string _tick = GetBidAsk(Publish_Symbols[s]);
            Print("Sending " + Publish_Symbols[s] + " " + _tick + " to PUB Socket");

            InformPullClient(pubSocket, StringFormat("%s %s", Publish_Symbols[s], _tick));
           }
        }

     }
  }

//+------------------------------------------------------------------+
//| Expert timer function                                            |
//+------------------------------------------------------------------+
void OnTimer()
  {
//---

   /*
      Use this OnTimer() function to get and respond to commands
   */
   if(Publish_OpenOrders == true)
     {

      string end = "";
      string zmq_ret = "";
      string send = "";
      string account_info = "{'_account': {'_balance': " + DoubleToString(AccountInfoDouble(ACCOUNT_BALANCE),2)
                            + ", '_profit': " + DoubleToString(AccountInfoDouble(ACCOUNT_PROFIT),2)
                            + ", '_equity': " + DoubleToString(AccountInfoDouble(ACCOUNT_EQUITY),2)
                            + ", '_margin': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN),2)
                            + ", '_free_margin': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_FREE),2)
                            + ", '_margin_level': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_LEVEL),2)
                            + ", '_margin_call': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_SO_CALL),2)
                            + ", '_stop_out_percent': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_SO_SO),2)
                            + ", '_account_number': " + IntegerToString(AccountNumber())
                            + ", '_leverange': " + IntegerToString(AccountInfoInteger(ACCOUNT_LEVERAGE))
                            + ", '_leverange': " + DoubleToString(AccountInfoDouble(ACCOUNT_CREDIT))
                            + ", '_leverange': " + IntegerToString(AccountInfoInteger(ACCOUNT_LIMIT_ORDERS))
                            + "} }";

      for(int i=OrdersTotal()-1; i>=0; i--)
        {
         if(OrderSelect(i,SELECT_BY_POS)==true)
           {

            if(i != 0)
              {
               end = "}, ";
              }
            else
              {

               end = "} } }";
              }

            zmq_ret = StringConcatenate(zmq_ret, "'" +
                                        IntegerToString(OrderTicket()) + "': {" +

                                        "'_magic': " + IntegerToString(OrderMagicNumber()) +
                                        ", '_symbol': '" + OrderSymbol() +
                                        "', '_lots': " + DoubleToString(OrderLots()) +
                                        ", '_type': " + IntegerToString(OrderType()) +
                                        ", '_open_price': " + DoubleToString(OrderOpenPrice()) +
                                        ", '_open_time': '" + TimeToStr(OrderOpenTime(),TIME_DATE|TIME_SECONDS) +
                                        "', '_SL': " + DoubleToString(OrderStopLoss()) +
                                        ", '_TP': " + DoubleToString(OrderTakeProfit()) +
                                        ", '_pnl': " + DoubleToString(OrderProfit()) +
                                        ", '_comment': '" + OrderComment() + "'" + end + "");

           }
        }
      send = StringConcatenate(account_info, zmq_ret);
      InformPullClient(pubSocketOpenOrders, StringFormat("%s", send, true));
      Print("Pushing Live Trades from  " + IntegerToString(AccountNumber()) + " to PUB Socket");
     }

   if(CheckServerStatus() == true)
     {
      // Get client's response, but don't block.
      pullSocket.recv(request, true);

      if(request.size() > 0)
         MessageHandler(request);
     }
  }
//+------------------------------------------------------------------+

// ZmqMsg MessageHandler(ZmqMsg &_request) {
void MessageHandler(ZmqMsg &_request)
  {

// Message components for later.
   string components[11];

   if(_request.size() > 0)
     {

      // Get data from request
      ArrayResize(_data, _request.size());
      _request.getData(_data);
      string dataStr = CharArrayToString(_data);
      Print(dataStr);

      // Process data
      ParseZmqMessage(dataStr, components);

      // Interpret data
      // InterpretZmqMessage(&pushSocket, components);
      InterpretZmqMessage(pushSocket, components);

     }
  }

// Interpret Zmq Message and perform actions
void InterpretZmqMessage(Socket &pSocket, string &compArray[])
  {

// Message Structures:

// 1) Trading
// TRADE|ACTION|TYPE|SYMBOL|PRICE|SL|TP|COMMENT|TICKET
// e.g. TRADE|OPEN|1|EURUSD|0|50|50|R-to-MetaTrader4|12345678

// The 12345678 at the end is the ticket ID, for MODIFY and CLOSE.

// 2) Data Requests

// 2.1) RATES|SYMBOL   -> Returns Current Bid/Ask

// 2.2) DATA|SYMBOL|TIMEFRAME|START_DATETIME|END_DATETIME

// NOTE: datetime has format: D'2015.01.01 00:00'

   /*
      compArray[0] = TRADE or RATES
      If RATES -> compArray[1] = Symbol

      If TRADE ->
         compArray[0] = TRADE
         compArray[1] = ACTION (e.g. OPEN, MODIFY, CLOSE)
         compArray[2] = TYPE (e.g. OP_BUY, OP_SELL, etc - only used when ACTION=OPEN)

         // ORDER TYPES:
         // https://docs.mql4.com/constants/tradingconstants/orderproperties

         // OP_BUY = 0
         // OP_SELL = 1
         // OP_BUYLIMIT = 2
         // OP_SELLLIMIT = 3
         // OP_BUYSTOP = 4
         // OP_SELLSTOP = 5

         compArray[3] = Symbol (e.g. EURUSD, etc.)
         compArray[4] = Open/Close Price (ignored if ACTION = MODIFY)
         compArray[5] = SL
         compArray[6] = TP
         compArray[7] = Trade Comment
         compArray[8] = Lots
         compArray[9] = Magic Number
         compArray[10] = Ticket Number (MODIFY/CLOSE)
   */

   int switch_action = 0;

   /* 02-08-2019 10:41 CEST - HEARTBEAT */
   if(compArray[0] == "HEARTBEAT")
      InformPullClient(pSocket, "{'_action': 'heartbeat', '_response': 'loud and clear!'}");

   /* Process Messages */
   if(compArray[0] == "TRADE" && compArray[1] == "OPEN")
      switch_action = 1;
   if(compArray[0] == "TRADE" && compArray[1] == "MODIFY")
      switch_action = 2;
   if(compArray[0] == "TRADE" && compArray[1] == "CLOSE")
      switch_action = 3;
   if(compArray[0] == "TRADE" && compArray[1] == "CLOSE_PARTIAL")
      switch_action = 4;
   if(compArray[0] == "TRADE" && compArray[1] == "CLOSE_MAGIC")
      switch_action = 5;
   if(compArray[0] == "TRADE" && compArray[1] == "CLOSE_ALL")
      switch_action = 6;
   if(compArray[0] == "TRADE" && compArray[1] == "GET_OPEN_TRADES")
      switch_action = 7;
   if(compArray[0] == "DATA")
      switch_action = 8;
   if(compArray[0] == "TRADE" && compArray[1] == "GET_ACCOUNT_INFO")
      switch_action = 9;
   if(compArray[0] == "TRADE" && compArray[1] == "GET_MARKETINFO" && ArraySize(compArray) >=3 )
      switch_action = 10;

   /* Setup processing variables */
   string zmq_ret = "";
   string ret = "";
   int ticket = -1;
   bool ans = false;

   /****************************
    * PERFORM SOME CHECKS HERE *
    ****************************/
   if(CheckOpsStatus(pSocket, switch_action) == true)
     {
      switch(switch_action)
        {
         case 1: // OPEN TRADE

            zmq_ret = "{";

            // Function definition:
            ticket = DWX_OpenOrder(compArray[3], StrToInteger(compArray[2]), StrToDouble(compArray[8]),
                                   StrToDouble(compArray[4]), StrToInteger(compArray[5]), StrToInteger(compArray[6]),
                                   compArray[7], StrToInteger(compArray[9]), zmq_ret);

            // Send TICKET back as JSON
            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 2: // MODIFY SL/TP

            zmq_ret = "{'_action': 'MODIFY'";

            // Function definition:
            ans = DWX_SetSLTP(StrToInteger(compArray[10]), StrToDouble(compArray[5]), StrToDouble(compArray[6]),
                              StrToInteger(compArray[9]), StrToInteger(compArray[2]), StrToDouble(compArray[4]),
                              compArray[3], 3, zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 3: // CLOSE TRADE

            zmq_ret = "{";

            // IMPLEMENT CLOSE TRADE LOGIC HERE
            DWX_CloseOrder_Ticket(StrToInteger(compArray[10]), zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 4: // CLOSE PARTIAL

            zmq_ret = "{";

            ans = DWX_ClosePartial(StrToDouble(compArray[8]), zmq_ret, StrToInteger(compArray[10]));

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 5: // CLOSE MAGIC

            zmq_ret = "{";

            DWX_CloseOrder_Magic(StrToInteger(compArray[9]), zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 6: // CLOSE ALL ORDERS

            zmq_ret = "{";

            DWX_CloseAllOrders(zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 7: // GET OPEN ORDERS

            zmq_ret = "{";

            DWX_GetOpenOrders(zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;

         case 8: // DATA REQUEST

            zmq_ret = "{";

            DWX_GetData(compArray, zmq_ret);

            InformPullClient(pSocket, zmq_ret + "}");

            break;
         case 9:
           {
            string send = "";
            send = StringConcatenate("", "{'_account': {'_balance': " + DoubleToString(AccountInfoDouble(ACCOUNT_BALANCE),2)
                                  + ", '_profit': " + DoubleToString(AccountInfoDouble(ACCOUNT_PROFIT),2)
                                  + ", '_equity': " + DoubleToString(AccountInfoDouble(ACCOUNT_EQUITY),2)
                                  + ", '_margin': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN),2)
                                  + ", '_free_margin': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_FREE),2)
                                  + ", '_margin_level': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_LEVEL),2)
                                  + ", '_margin_call': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_SO_CALL),2)
                                  + ", '_stop_out_percent': " + DoubleToString(AccountInfoDouble(ACCOUNT_MARGIN_SO_SO),2)
                                  + ", '_account_number': " + IntegerToString(AccountNumber())
                                  + ", '_leverange': " + IntegerToString(AccountInfoInteger(ACCOUNT_LEVERAGE))
                                  + ", '_credit': " + DoubleToString(AccountInfoDouble(ACCOUNT_CREDIT))
                                  + ", '_max_orders': " + IntegerToString(AccountInfoInteger(ACCOUNT_LIMIT_ORDERS))
                                  + "} }");
            InformPullClient(pSocket, StringFormat("%s", send));
            break;

           }
         case 10:
           {
            string send2 = "";
            string symbol = compArray[3];
            send2 = StringConcatenate("","{'_market_info': {'" + symbol + "': {"
                                + "'_point_size': " + DoubleToString(MarketInfo(symbol,MODE_POINT))
                                + ", '_precision': " + DoubleToString(MarketInfo(symbol,MODE_DIGITS))
                                + ", '_swap_long': " + DoubleToString(MarketInfo(symbol,MODE_SWAPLONG))
                                + ", '_swap_short': " + DoubleToString(MarketInfo(symbol,MODE_SWAPSHORT))
                                + ", '_trade_allowed': " + DoubleToString(MarketInfo(symbol,MODE_TRADEALLOWED))
                                + ", '_min_lot': " + DoubleToString(MarketInfo(symbol,MODE_MINLOT))
                                + ", '_min_lot_step': " + DoubleToString(MarketInfo(symbol,MODE_LOTSTEP))
                                + ", '_max_lot': " + DoubleToString(MarketInfo(symbol,MODE_MAXLOT))+ "} } }");
            InformPullClient(pSocket, StringFormat("%s", send2));
            break;
           }

         break;
         default:
            break;
        }
     }
  }

// Check if operations are permitted
bool CheckOpsStatus(Socket &pSocket, int sFlag)
  {

   if(sFlag <= 6)
     {

      if(!IsTradeAllowed())
        {
         InformPullClient(pSocket, "{'_response': 'TRADING_IS_NOT_ALLOWED__ABORTED_COMMAND'}");
         return(false);
        }
      else
         if(!IsExpertEnabled())
           {
            InformPullClient(pSocket, "{'_response': 'EA_IS_DISABLED__ABORTED_COMMAND'}");
            return(false);
           }
         else
            if(IsTradeContextBusy())
              {
               InformPullClient(pSocket, "{'_response': 'TRADE_CONTEXT_BUSY__ABORTED_COMMAND'}");
               return(false);
              }
            else
               if(!IsDllsAllowed())
                 {
                  InformPullClient(pSocket, "{'_response': 'DLLS_DISABLED__ABORTED_COMMAND'}");
                  return(false);
                 }
               else
                  if(!IsLibrariesAllowed())
                    {
                     InformPullClient(pSocket, "{'_response': 'LIBS_DISABLED__ABORTED_COMMAND'}");
                     return(false);
                    }
                  else
                     if(!IsConnected())
                       {
                        InformPullClient(pSocket, "{'_response': 'NO_BROKER_CONNECTION__ABORTED_COMMAND'}");
                        return(false);
                       }
     }

   return(true);
  }

// Parse Zmq Message
void ParseZmqMessage(string& message, string& retArray[])
  {

//Print("Parsing: " + message);

   string sep = ";";
   ushort u_sep = StringGetCharacter(sep,0);

   int splits = StringSplit(message, u_sep, retArray);

   /*
   for(int i = 0; i < splits; i++) {
      Print(IntegerToString(i) + ") " + retArray[i]);
   }
   */
  }

//+------------------------------------------------------------------+
// Generate string for Bid/Ask by symbol
string GetBidAsk(string symbol)
  {

   MqlTick last_tick;

   if(SymbolInfoTick(symbol,last_tick))
     {
      return(StringFormat("%f;%f", last_tick.bid, last_tick.ask));
     }

// Default
   return "";
  }

// Get data for request datetime range
void DWX_GetData(string& compArray[], string& zmq_ret)
  {

// Format: DATA|SYMBOL|TIMEFRAME|START_DATETIME|END_DATETIME

   double price_array[];
   datetime time_array[];

// Get prices
   int price_count = CopyClose(compArray[1],
                               StrToInteger(compArray[2]), StrToTime(compArray[3]),
                               StrToTime(compArray[4]), price_array);

// Get timestamps
   int time_count = CopyTime(compArray[1],
                             StrToInteger(compArray[2]), StrToTime(compArray[3]),
                             StrToTime(compArray[4]), time_array);

   zmq_ret = zmq_ret + "'_action': 'DATA'";

   if(price_count > 0)
     {

      zmq_ret = zmq_ret + ", '_data': {";

      // Construct string of price|price|price|.. etc and send to PULL client.
      for(int i = 0; i < price_count; i++)
        {

         if(i == 0)
            zmq_ret = zmq_ret + "'" + TimeToString(time_array[i]) + "': " + DoubleToString(price_array[i]);
         else
            zmq_ret = zmq_ret + ", '" + TimeToString(time_array[i]) + "': " + DoubleToString(price_array[i]);

        }

      zmq_ret = zmq_ret + "}";

     }
   else
     {
      zmq_ret = zmq_ret + ", " + "'_response': 'NOT_AVAILABLE'";
     }

  }

// Inform Client
void InformPullClient(Socket& pSocket, string message)
  {

   ZmqMsg pushReply(StringFormat("%s", message));

   pSocket.send(pushReply,true); // NON-BLOCKING

  }

/*
 ############################################################################
 ############################################################################
 ############################################################################
*/

// OPEN NEW ORDER
int DWX_OpenOrder(string _symbol, int _type, double _lots, double _price, double _SL, double _TP, string _comment, int _magic, string &zmq_ret)
  {

   int ticket, error;

   zmq_ret = zmq_ret + "'_action': 'EXECUTION'";

   if(_lots > MaximumLotSize)
     {
      zmq_ret = zmq_ret + ", " + "'_response': 'LOT_SIZE_ERROR', 'response_value': 'MAX_LOT_SIZE_EXCEEDED'";
      return(-1);
     }

   double sl = _SL;
   double tp = _TP;

// Else
   if(DMA_MODE)
     {
      sl = 0.0;
      tp = 0.0;
     }

   if(_symbol == "NULL")
     {
      ticket = OrderSend(Symbol(), _type, _lots, _price, MaximumSlippage, sl, tp, _comment, _magic);
     }
   else
     {
      ticket = OrderSend(_symbol, _type, _lots, _price, MaximumSlippage, sl, tp, _comment, _magic);
     }
   if(ticket < 0)
     {
      // Failure
      error = GetLastError();
      zmq_ret = zmq_ret + ", " + "'_response': '" + IntegerToString(error) + "', 'response_value': '" + ErrorDescription(error) + "'";
      return(-1*error);
     }

   int tmpRet = OrderSelect(ticket, SELECT_BY_TICKET, MODE_TRADES);

   zmq_ret = zmq_ret + ", " + "'_magic': " + IntegerToString(_magic) + ", '_ticket': " + IntegerToString(OrderTicket()) + ", '_open_time': '" + TimeToStr(OrderOpenTime(),TIME_DATE|TIME_SECONDS) + "', '_open_price': " + DoubleToString(OrderOpenPrice());

   if(DMA_MODE)
     {

      int retries = 3;
      while(true)
        {
         retries--;
         if(retries < 0)
            return(0);

         if((_SL == 0 && _TP == 0) || (OrderStopLoss() == _SL && OrderTakeProfit() == _TP))
           {
            return(ticket);
           }

         if(DWX_IsTradeAllowed(30, zmq_ret) == 1)
           {
            if(DWX_SetSLTP(ticket, _SL, _TP, _magic, _type, _price, _symbol, retries, zmq_ret))
              {
               return(ticket);
              }
            if(retries == 0)
              {
               zmq_ret = zmq_ret + ", '_response': 'ERROR_SETTING_SL_TP'";
               return(-11111);
              }
           }

         Sleep(MILLISECOND_TIMER);
        }

      zmq_ret = zmq_ret + ", '_response': 'ERROR_SETTING_SL_TP'";
      zmq_ret = zmq_ret + "}";
      return(-1);
     }

// Send zmq_ret to Python Client
   zmq_ret = zmq_ret + "}";

   return(ticket);
  }

// SET SL/TP
bool DWX_SetSLTP(int ticket, double _SL, double _TP, int _magic, int _type, double _price, string _symbol, int retries, string &zmq_ret)
  {

   if(OrderSelect(ticket, SELECT_BY_TICKET) == true)
     {
      int dir_flag = -1;

      if(OrderType() == 0 || OrderType() == 2 || OrderType() == 4)
         dir_flag = 1;

      double vpoint  = MarketInfo(OrderSymbol(), MODE_POINT);
      int    vdigits = (int)MarketInfo(OrderSymbol(), MODE_DIGITS);
      double mSL = _SL;
      double mTP = _TP;

      // if(OrderModify(ticket, OrderOpenPrice(), NormalizeDouble(OrderOpenPrice()-_SL*dir_flag*vpoint,vdigits), NormalizeDouble(OrderOpenPrice()+_TP*dir_flag*vpoint,vdigits), 0, 0)) {
      if(OrderModify(ticket, OrderOpenPrice(), mSL, mTP, 0, 0))
        {
         // zmq_ret = zmq_ret + ", '_sl': " + DoubleToString(_SL) + ", '_tp': " + DoubleToString(_TP);
         zmq_ret = zmq_ret + ", '_sl': " + DoubleToString(mSL) + ", '_tp': " + DoubleToString(mTP);
         return(true);
        }
      else
        {
         int error = GetLastError();
         zmq_ret = zmq_ret + ", '_response': '" + IntegerToString(error) + "', '_response_value': '" + ErrorDescription(error) + "', '_sl_attempted': " + DoubleToString(NormalizeDouble(OrderOpenPrice()-_SL*dir_flag*vpoint,vdigits)) + ", '_tp_attempted': " + DoubleToString(NormalizeDouble(OrderOpenPrice()+_TP*dir_flag*vpoint,vdigits));

         if(retries == 0)
           {
            RefreshRates();
            DWX_CloseAtMarket(-1, zmq_ret);
           }

         return(false);
        }
     }
   else
     {
      zmq_ret = zmq_ret + ", '_response': 'NOT_FOUND'";
     }

   return(false);
  }

// CLOSE AT MARKET
bool DWX_CloseAtMarket(double size, string &zmq_ret)
  {

   int error;

   int retries = 3;
   while(true)
     {
      retries--;
      if(retries < 0)
         return(false);

      if(DWX_IsTradeAllowed(30, zmq_ret) == 1)
        {
         if(DWX_ClosePartial(size, zmq_ret))
           {
            // trade successfuly closed
            return(true);
           }
         else
           {
            error = GetLastError();
            zmq_ret = zmq_ret + ", '_response': '" + IntegerToString(error) + "', '_response_value': '" + ErrorDescription(error) + "'";
           }
        }

     }

   return(false);
  }

// CLOSE PARTIAL SIZE
bool DWX_ClosePartial(double size, string &zmq_ret, int ticket = 0)
  {

   int error;
   bool close_ret = False;

// If the function is called directly, setup init() JSON here and get OrderSelect.
   if(ticket != 0)
     {
      zmq_ret = zmq_ret + "'_action': 'CLOSE', '_ticket': " + IntegerToString(ticket);
      zmq_ret = zmq_ret + ", '_response': 'CLOSE_PARTIAL'";
      int tmpRet = OrderSelect(ticket, SELECT_BY_TICKET);
     }

   RefreshRates();
   double priceCP;

   if(OrderType() == OP_BUY)
     {
      priceCP = DWX_GetBid(OrderSymbol());
     }
   else
      if(OrderType() == OP_SELL)
        {
         priceCP = DWX_GetAsk(OrderSymbol());
        }
      else
        {
         return(true);
        }

   ticket = OrderTicket();

   if(size < 0.01 || size > OrderLots())
     {
      size = OrderLots();
     }
   close_ret = OrderClose(ticket, size, priceCP, MaximumSlippage);

   if(close_ret == true)
      zmq_ret = zmq_ret + ", '_close_price': " + DoubleToString(priceCP) + ", '_close_lots': " + DoubleToString(size);
   else
     {
      error = GetLastError();
      zmq_ret = zmq_ret + ", '_response': '" + IntegerToString(error) + "', '_response_value': '" + ErrorDescription(error) + "'";
     }

   return(close_ret);

  }

// CLOSE ORDER (by Magic Number)
void DWX_CloseOrder_Magic(int _magic, string &zmq_ret)
  {

   bool found = false;

   zmq_ret = zmq_ret + "'_action': 'CLOSE_ALL_MAGIC'";
   zmq_ret = zmq_ret + ", '_magic': " + IntegerToString(_magic);

   zmq_ret = zmq_ret + ", '_responses': {";

   for(int i=OrdersTotal()-1; i >= 0; i--)
     {
      if(OrderSelect(i,SELECT_BY_POS)==true && OrderMagicNumber() == _magic)
        {
         found = true;

         zmq_ret = zmq_ret + IntegerToString(OrderTicket()) + ": {'_symbol':'" + OrderSymbol() + "'";

         if(OrderType() == OP_BUY || OrderType() == OP_SELL)
           {
            DWX_CloseAtMarket(-1, zmq_ret);
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_MARKET'";

            if(i != 0)
               zmq_ret = zmq_ret + "}, ";
            else
               zmq_ret = zmq_ret + "}";

           }
         else
           {
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_PENDING'";

            if(i != 0)
               zmq_ret = zmq_ret + "}, ";
            else
               zmq_ret = zmq_ret + "}";

            int tmpRet = OrderDelete(OrderTicket());
           }
        }
     }

   zmq_ret = zmq_ret + "}";

   if(found == false)
     {
      zmq_ret = zmq_ret + ", '_response': 'NOT_FOUND'";
     }
   else
     {
      zmq_ret = zmq_ret + ", '_response_value': 'SUCCESS'";
     }

  }

// CLOSE ORDER (by Ticket)
void DWX_CloseOrder_Ticket(int _ticket, string &zmq_ret)
  {

   bool found = false;

   zmq_ret = zmq_ret + "'_action': 'CLOSE', '_ticket': " + IntegerToString(_ticket);

   for(int i=0; i<OrdersTotal(); i++)
     {
      if(OrderSelect(i,SELECT_BY_POS)==true && OrderTicket() == _ticket)
        {
         found = true;

         if(OrderType() == OP_BUY || OrderType() == OP_SELL)
           {
            DWX_CloseAtMarket(-1, zmq_ret);
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_MARKET'";
           }
         else
           {
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_PENDING'";
            int tmpRet = OrderDelete(OrderTicket());
           }
        }
     }

   if(found == false)
     {
      zmq_ret = zmq_ret + ", '_response': 'NOT_FOUND'";
     }
   else
     {
      zmq_ret = zmq_ret + ", '_response_value': 'SUCCESS'";
     }

  }

// CLOSE ALL ORDERS
void DWX_CloseAllOrders(string &zmq_ret)
  {

   bool found = false;

   zmq_ret = zmq_ret + "'_action': 'CLOSE_ALL'";

   zmq_ret = zmq_ret + ", '_responses': {";

   for(int i=OrdersTotal()-1; i >= 0; i--)
     {
      if(OrderSelect(i,SELECT_BY_POS)==true)
        {

         found = true;

         zmq_ret = zmq_ret + IntegerToString(OrderTicket()) + ": {'_symbol':'" + OrderSymbol() + "', '_magic': " + IntegerToString(OrderMagicNumber());

         if(OrderType() == OP_BUY || OrderType() == OP_SELL)
           {
            DWX_CloseAtMarket(-1, zmq_ret);
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_MARKET'";

            if(i != 0)
               zmq_ret = zmq_ret + "}, ";
            else
               zmq_ret = zmq_ret + "}";

           }
         else
           {
            zmq_ret = zmq_ret + ", '_response': 'CLOSE_PENDING'";

            if(i != 0)
               zmq_ret = zmq_ret + "}, ";
            else
               zmq_ret = zmq_ret + "}";

            int tmpRet = OrderDelete(OrderTicket());
           }
        }
     }

   zmq_ret = zmq_ret + "}";

   if(found == false)
     {
      zmq_ret = zmq_ret + ", '_response': 'NOT_FOUND'";
     }
   else
     {
      zmq_ret = zmq_ret + ", '_response_value': 'SUCCESS'";
     }

  }

// GET OPEN ORDERS
void DWX_GetOpenOrders(string &zmq_ret)
  {

   bool found = false;

   zmq_ret = zmq_ret + "'_action': 'OPEN_TRADES'";
   zmq_ret = zmq_ret + ", '_trades': {";

   for(int i=OrdersTotal()-1; i>=0; i--)
     {
      found = true;

      if(OrderSelect(i,SELECT_BY_POS)==true)
        {

         zmq_ret = zmq_ret + "'" + IntegerToString(OrderTicket()) + "': {";

         zmq_ret = zmq_ret + "'_magic': " + IntegerToString(OrderMagicNumber()) + ", '_symbol': '" + OrderSymbol() + "', '_lots': " + DoubleToString(OrderLots()) + ", '_type': " + IntegerToString(OrderType()) + ", '_open_price': " + DoubleToString(OrderOpenPrice()) + ", '_open_time': '" + TimeToStr(OrderOpenTime(),TIME_DATE|TIME_SECONDS) + "', '_SL': " + DoubleToString(OrderStopLoss()) + ", '_TP': " + DoubleToString(OrderTakeProfit()) + ", '_pnl': " + DoubleToString(OrderProfit()) + ", '_comment': '" + OrderComment() + "'";

         if(i != 0)
            zmq_ret = StringConcatenate(zmq_ret + "}, ");
         else
            zmq_ret = StringConcatenate(zmq_ret + "} }");
        }
     }

  }

// CHECK IF TRADE IS ALLOWED
int DWX_IsTradeAllowed(int MaxWaiting_sec, string &zmq_ret)
  {

   if(!IsTradeAllowed())
     {

      int StartWaitingTime = (int)GetTickCount();
      zmq_ret = zmq_ret + ", " + "'_response': 'TRADE_CONTEXT_BUSY'";

      while(true)
        {

         if(IsStopped())
           {
            zmq_ret = zmq_ret + ", " + "'_response_value': 'EA_STOPPED_BY_USER'";
            return(-1);
           }

         int diff = (int)(GetTickCount() - StartWaitingTime);
         if(diff > MaxWaiting_sec * 1000)
           {
            zmq_ret = zmq_ret + ", '_response': 'WAIT_LIMIT_EXCEEDED', '_response_value': " + IntegerToString(MaxWaiting_sec);
            return(-2);
           }
         // if the trade context has become free,
         if(IsTradeAllowed())
           {
            zmq_ret = zmq_ret + ", '_response': 'TRADE_CONTEXT_NOW_FREE'";
            RefreshRates();
            return(1);
           }

        }
     }
   else
     {
      return(1);
     }

   return(1);
  }

//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
bool CheckServerStatus()
  {

// Is _StopFlag == True, inform the client application
   if(IsStopped())
     {
      InformPullClient(pullSocket, "{'_response': 'EA_IS_STOPPED'}");
      return(false);
     }

// Default
   return(true);
  }

//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
string ErrorDescription(int error_code)
  {
   string error_string;
//----
   switch(error_code)
     {
      //---- codes returned from trade server
      case 0:
      case 1:
         error_string="no error";
         break;
      case 2:
         error_string="common error";
         break;
      case 3:
         error_string="invalid trade parameters";
         break;
      case 4:
         error_string="trade server is busy";
         break;
      case 5:
         error_string="old version of the client terminal";
         break;
      case 6:
         error_string="no connection with trade server";
         break;
      case 7:
         error_string="not enough rights";
         break;
      case 8:
         error_string="too frequent requests";
         break;
      case 9:
         error_string="malfunctional trade operation (never returned error)";
         break;
      case 64:
         error_string="account disabled";
         break;
      case 65:
         error_string="invalid account";
         break;
      case 128:
         error_string="trade timeout";
         break;
      case 129:
         error_string="invalid price";
         break;
      case 130:
         error_string="invalid stops";
         break;
      case 131:
         error_string="invalid trade volume";
         break;
      case 132:
         error_string="market is closed";
         break;
      case 133:
         error_string="trade is disabled";
         break;
      case 134:
         error_string="not enough money";
         break;
      case 135:
         error_string="price changed";
         break;
      case 136:
         error_string="off quotes";
         break;
      case 137:
         error_string="broker is busy (never returned error)";
         break;
      case 138:
         error_string="requote";
         break;
      case 139:
         error_string="order is locked";
         break;
      case 140:
         error_string="long positions only allowed";
         break;
      case 141:
         error_string="too many requests";
         break;
      case 145:
         error_string="modification denied because order too close to market";
         break;
      case 146:
         error_string="trade context is busy";
         break;
      case 147:
         error_string="expirations are denied by broker";
         break;
      case 148:
         error_string="amount of open and pending orders has reached the limit";
         break;
      case 149:
         error_string="hedging is prohibited";
         break;
      case 150:
         error_string="prohibited by FIFO rules";
         break;
      //---- mql4 errors
      case 4000:
         error_string="no error (never generated code)";
         break;
      case 4001:
         error_string="wrong function pointer";
         break;
      case 4002:
         error_string="array index is out of range";
         break;
      case 4003:
         error_string="no memory for function call stack";
         break;
      case 4004:
         error_string="recursive stack overflow";
         break;
      case 4005:
         error_string="not enough stack for parameter";
         break;
      case 4006:
         error_string="no memory for parameter string";
         break;
      case 4007:
         error_string="no memory for temp string";
         break;
      case 4008:
         error_string="not initialized string";
         break;
      case 4009:
         error_string="not initialized string in array";
         break;
      case 4010:
         error_string="no memory for array\' string";
         break;
      case 4011:
         error_string="too long string";
         break;
      case 4012:
         error_string="remainder from zero divide";
         break;
      case 4013:
         error_string="zero divide";
         break;
      case 4014:
         error_string="unknown command";
         break;
      case 4015:
         error_string="wrong jump (never generated error)";
         break;
      case 4016:
         error_string="not initialized array";
         break;
      case 4017:
         error_string="dll calls are not allowed";
         break;
      case 4018:
         error_string="cannot load library";
         break;
      case 4019:
         error_string="cannot call function";
         break;
      case 4020:
         error_string="expert function calls are not allowed";
         break;
      case 4021:
         error_string="not enough memory for temp string returned from function";
         break;
      case 4022:
         error_string="system is busy (never generated error)";
         break;
      case 4050:
         error_string="invalid function parameters count";
         break;
      case 4051:
         error_string="invalid function parameter value";
         break;
      case 4052:
         error_string="string function internal error";
         break;
      case 4053:
         error_string="some array error";
         break;
      case 4054:
         error_string="incorrect series array using";
         break;
      case 4055:
         error_string="custom indicator error";
         break;
      case 4056:
         error_string="arrays are incompatible";
         break;
      case 4057:
         error_string="global variables processing error";
         break;
      case 4058:
         error_string="global variable not found";
         break;
      case 4059:
         error_string="function is not allowed in testing mode";
         break;
      case 4060:
         error_string="function is not confirmed";
         break;
      case 4061:
         error_string="send mail error";
         break;
      case 4062:
         error_string="string parameter expected";
         break;
      case 4063:
         error_string="integer parameter expected";
         break;
      case 4064:
         error_string="double parameter expected";
         break;
      case 4065:
         error_string="array as parameter expected";
         break;
      case 4066:
         error_string="requested history data in update state";
         break;
      case 4099:
         error_string="end of file";
         break;
      case 4100:
         error_string="some file error";
         break;
      case 4101:
         error_string="wrong file name";
         break;
      case 4102:
         error_string="too many opened files";
         break;
      case 4103:
         error_string="cannot open file";
         break;
      case 4104:
         error_string="incompatible access to a file";
         break;
      case 4105:
         error_string="no order selected";
         break;
      case 4106:
         error_string="unknown symbol";
         break;
      case 4107:
         error_string="invalid price parameter for trade function";
         break;
      case 4108:
         error_string="invalid ticket";
         break;
      case 4109:
         error_string="trade is not allowed in the expert properties";
         break;
      case 4110:
         error_string="longs are not allowed in the expert properties";
         break;
      case 4111:
         error_string="shorts are not allowed in the expert properties";
         break;
      case 4200:
         error_string="object is already exist";
         break;
      case 4201:
         error_string="unknown object property";
         break;
      case 4202:
         error_string="object is not exist";
         break;
      case 4203:
         error_string="unknown object type";
         break;
      case 4204:
         error_string="no object name";
         break;
      case 4205:
         error_string="object coordinates error";
         break;
      case 4206:
         error_string="no specified subwindow";
         break;
      default:
         error_string="unknown error";
     }
//----
   return(error_string);
  }

//+------------------------------------------------------------------+

//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
double DWX_GetAsk(string symbol)
  {
   if(symbol == "NULL")
     {
      return(Ask);
     }
   else
     {
      return(MarketInfo(symbol,MODE_ASK));
     }
  }

//+------------------------------------------------------------------+

//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
double DWX_GetBid(string symbol)
  {
   if(symbol == "NULL")
     {
      return(Bid);
     }
   else
     {
      return(MarketInfo(symbol,MODE_BID));
     }
  }
//+------------------------------------------------------------------+

Sometimes i got the "Resource timeout.. please try again." error when i edited the server.mql or client.py. It workes again if I restarted my computer. I guess there are some binding and uncleanly exit problems with zmq or Windows. I dont know.

Please leave a comment and have a good time.