ig-python / trading-ig

A lightweight Python wrapper for the IG Markets API
https://trading-ig.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
313 stars 197 forks source link

Unable to get OPU via streaming API #289

Closed MilindRCodes closed 1 year ago

MilindRCodes commented 1 year ago

I am using the streaming API and the issue I am facing is for working orders.

1) OPU is not received for all the working orders which are filled. I receive it for some and for others I am not. 2) OPU are received with a delay and not immediately.

Is anyone facing a similar issue.

I am sharing the code below which I am using for streamng API connection.

class StreamingData:

def __init__(self, ig_stream_service, shared_queue, contract_epic, ig_account_id, logger):

    self.ig_stream_service = ig_stream_service
    self.ig_account_id = ig_account_id
    self.shared_queue = shared_queue
    self.contract_epic = contract_epic
    self.logger = logger

# ------------------------------------------------------------------------------------------------------------------
#
# ------------------------------------------------------------------------------------------------------------------

# A simple function acting as a Subscription listener
def on_prices_update(self, item_update):
    log_data = {
        'type': 'streaming_prices',
        'data': item_update
    }
    self.shared_queue.put(log_data)

def on_account_update(self, balance_update):
    log_data = {
        'type': 'streaming_balance_update',
        'data': balance_update
    }
    self.shared_queue.put(log_data)

# A simple function acting as a Subscription listener
def on_trade_confirmation(self, item_update):
    log_data = {
        'type': 'streaming_trade_confirmation',
        'data': item_update
    }
    self.shared_queue.put(log_data)

# ------------------------------------------------------------------------------------------------------------------
#
# ------------------------------------------------------------------------------------------------------------------

def get_streaming_price_and_account_updates(self):
    try:

        epic_with_level = 'L1:' + self.contract_epic  # e.g., "L1:IX.D.ASX.IFD.IP"

        # ----------------------------------------------------------------------------------------------------------
        # Make Price Subscription in MERGE mode
        # ----------------------------------------------------------------------------------------------------------
        subscription_prices = Subscription(
            mode="MERGE",
            items=[epic_with_level],  # sample CFD epics
            fields=["UPDATE_TIME", "BID", "OFFER", "CHANGE", "MARKET_STATE"],
        )

        # Adding the "on_price_update" function to Subscription
        subscription_prices.addlistener(self.on_prices_update)

        # Registering the Subscription
        sub_key_prices = self.ig_stream_service.ls_client.subscribe(subscription_prices)

        # ----------------------------------------------------------------------------------------------------------
        # Make Account Metrics Subscription in MERGE mode
        # ----------------------------------------------------------------------------------------------------------

        subscription_account = Subscription(
            mode="MERGE", items=["ACCOUNT:" + self.ig_account_id],
            fields=["PNL", "AVAILABLE_CASH", "FUNDS", "MARGIN", "AVAILABLE_TO_DEAL", "EQUITY", "EQUITY_USED"],
        )

        # Adding the "on_balance_update" function to Subscription
        subscription_account.addlistener(self.on_account_update)

        # Registering the Subscription
        sub_key_account = self.ig_stream_service.ls_client.subscribe(subscription_account)

        # ----------------------------------------------------------------------------------------------------------
    except Exception as e:
        self.logger.error(e)

# ------------------------------------------------------------------------------------------------------------------
#
# ------------------------------------------------------------------------------------------------------------------

def get_streaming_trade_and_order_updates(self):
    try:

        # Making an other Subscription in MERGE mode
        subscription_trade = Subscription(
            mode="DISTINCT", items=["TRADE:" + self.ig_account_id], fields=["CONFIRMS", "OPU", "WOU"],
        )

        # Adding the "on_trade_update" function to Subscription
        subscription_trade.addlistener(self.on_trade_confirmation)

        # Registering the Subscription
        sub_key_account = self.ig_stream_service.ls_client.subscribe(subscription_trade)

    except Exception as e:
        self.logger.error(e)

In the main code, I am extracting the streaming data as follows:

==========================================================================================================

        # SECTION 1: Stream Live Prices
        # ==========================================================================================================

        record = shared_queue.get()
        logger.info('RECORD: %s' % str(record))
        record_type = record.pop('type')

        if record_type == 'streaming_prices':
            live_prices = record['data']
            logger.info('streaming prices: %s' % str(live_prices))
            if live_prices is not None:
                bid, offer = float(live_prices['values']['BID']), float(live_prices['values']['OFFER'])
                mid_price = (bid + offer) / 2
        elif record_type == 'streaming_balance_update':
            latest_balance = record['data']
            logger.info('streaming balance: %s' % str(latest_balance))
            if latest_balance is not None:
                # TODO: CHECK IF WE NEED TO APPEND TO THE df_all_balances or these values can be kept
                #       separate.
                df_all_balances.loc[len(df_all_balances)] = [float(latest_balance['values']['FUNDS']),
                                                             float(latest_balance['values']['MARGIN']),
                                                             float(latest_balance['values']['PNL'])]
        elif record_type == 'streaming_trade_confirmation':
            api_trade_confirmations_dict = record['data']
            logger.info('streaming trade updates: %s' % str(api_trade_confirmations_dict))