NordicSemiconductor / pynrfjprog

Python wrapper around the nrfjprog dynamic link library (DLL)
Other
75 stars 29 forks source link

There is a memory leak in the window system. #18

Open w-xinjiang opened 3 years ago

w-xinjiang commented 3 years ago

In the demo below, when multiple threads are started, the program becomes very unstable. It will crash. After the program crashes and restarts, the memory will increase after a burn operation is completed, and there is a memory leak. Operating environment: python3.8.3 window7/10

No exception handling:

import threading
from pynrfjprog import HighLevel
import time

class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print ("Starting " + self.name)
      update_fw(self.name)
      program_app(self.name)
      print ("Exiting " + self.name)

def update_fw(snr):
    print("updating FW in {}".format(snr))
    probe = HighLevel.IPCDFUProbe(api, int(snr), HighLevel.CoProcessor.CP_MODEM)
    print("{} probe initialized". format(snr))
    probe.program("mfw_nrf9160_1.2.2.zip")
    print("{} programmed".format(snr))
    probe.verify("mfw_nrf9160_1.2.2.zip")
    print("{} Modem verified".format(snr))

def program_app(snr):
    app_probe = HighLevel.DebugProbe(api, int(snr) )

    program_options = HighLevel.ProgramOptions(
        erase_action=HighLevel.EraseAction.ERASE_ALL,
        reset = HighLevel.ResetAction.RESET_SYSTEM,
        verify = HighLevel.VerifyAction.VERIFY_READ
    )
    app_probe.program( "merged.hex" , program_options=program_options )

    app_probe.verify( "merged.hex" , HighLevel.VerifyAction.VERIFY_READ )
    print("{} Application verified".format(snr))

    app_probe.reset()

# Create new threads
while True:
    api = HighLevel.API()
    api.open()
    devices = api.get_connected_probes()
    threads = list()
    print(devices)
    for idx, device in enumerate(devices):
        threads.append(myThread(idx, device, idx))

    # Start new Threads
    for thread in threads:
        thread.start()

    print("waiting for all threads to complete")
    #wait for threads to finish
    for t in threads:
        t.join()

    api.close()
    time.sleep(5)    
print("Exiting Main Thread")

With exception handling:

import threading
import time
from pynrfjprog import HighLevel

thread_pool = []
lock = threading.Lock()

#Burn the thread
class operationThread (threading.Thread):
    def __init__(self, d_name, modem_file, app_file, burn_modem, burn_app):
        threading.Thread.__init__(self)
        self.d_name = d_name
        self.modem_file = modem_file
        self.app_file = app_file
        self.burn_modem = burn_modem
        self.burn_app = burn_app
    def run(self):
        err = 0
        if self.burn_modem:
            print(self.d_name,':Writing modem FW')
            err = update_fw(self.d_name, self.modem_file)
            if err:
                return
            print(self.d_name,':Successfully write modem FW')
        if self.burn_app:
            print(self.d_name,':Writing app FW')
            err = program_app(self.d_name, self.app_file)
            if err:
                return
            print(self.d_name,':Successfully write app FW')

def update_fw(snr, m_file):
    try:
        with HighLevel.IPCDFUProbe(api, int(snr), HighLevel.CoProcessor.CP_MODEM) as probe:
            try:
                probe.program(m_file)
            except Exception:
                print(snr,": Wrinting modem FW error!")
                return -1
            try:
                probe.verify(m_file)
            except Exception:
                print(snr,":Modem FW verification error")
                return -1
            return 0
    except Exception:
        print(snr,":Jlink initialization failed!")
        return -1

def program_app(snr, a_file):
    try:
        with HighLevel.DebugProbe(api, int(snr)) as app_probe:
            program_options = HighLevel.ProgramOptions(
                erase_action=HighLevel.EraseAction.ERASE_ALL,
                reset = HighLevel.ResetAction.RESET_SYSTEM,
                verify = HighLevel.VerifyAction.VERIFY_READ
            )

            try:
                app_probe.program(a_file, program_options=program_options)
            except Exception: 
                print(snr,":Writing app FW error!")
                return -1

            try:
                app_probe.verify(a_file, HighLevel.VerifyAction.VERIFY_READ)
            except Exception:
                print(snr,":Application firmware verification error")      
                return -1        
            try:
                app_probe.reset()
            except Exception:
                return -1
            return 0
    except Exception:
        print(snr,":Jlink initialization failed!")
        return -1

if __name__ == '__main__':
    m_file ='./mfw_nrf9160_1.2.2.zip'
    a_file ='./merged.hex'
    while True:
        print(time.asctime(time.localtime(time.time()))) 
        time.sleep(5)
        with HighLevel.API() as api:
            # api.open()
            devices = api.get_connected_probes()
            for i in range(len(devices)):
                a = operationThread(devices[i], m_file, a_file, True, True)
                a.setDaemon(True)
                a.start()
                thread_pool.append(a)
            for t in thread_pool:
                t.join()
            thread_pool = []
simtind commented 3 years ago

Could you provide the following: