CiscoTestAutomation / genielibs

genie.libs contains libraries for configuring, retrieving and testing topologies
Apache License 2.0
108 stars 55 forks source link

"Exception: The clean stage '' does not exist in the json file" while trying to use multiprocessing #163

Closed Sulray closed 4 months ago

Sulray commented 5 months ago

The error

Hi,

I was trying to make the execution of a script based on pyats/genie faster by using multiprocessing as recommended in the documentation.

So at some point, in a method called "runChecks" I replaced the following lines :

        for device in self.devices:
            # already connected to each device
            device.makeChecks(self.phase,self.name)

by this line : multiprocessing.Pool().map(self.runChecksDevice, self.devices) with this definition of method:

def runChecksDevice(self, device):
        device.makeChecks(self.phase,self.name)

So it's supposed to be exactly the same but with multiprocessing.

But I get the following error (in debug mode):

2024-04-22 14:54:41,983 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) 2024-04-22 14:54:42,023 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) Traceback (most recent call last): File "main.py", line 74, in operation.run() File "/package/path/my_operation.py", line 252, in run self.runChecks() File "/package/path/package/my_operation.py", line 332, in runChecks multiprocessing.Pool().map(self.runChecksDevice, self.devices) File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get raise self._value File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send self._send_bytes(ForkingPickler.dumps(obj)) File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr_\ File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function raise Exception(f"The clean stage '{name}' does not exist in the json " Exception: The clean stage '' does not exist in the json file

Before the change in lines to add multiprocessing it was working perfectly and I don't really get the reason of the error since it's happening directly at the beginning of the multiprocessing execution. The method creating the exception (get_clean_function) is linked to genielibs so maybe you have more insight on this.

Context

To give a little more context :

What could lead to this exception ? Did I do something wrong in my attempt to use multiprocessing with pyats/genie ?

Harishv01 commented 5 months ago

https://pubhub.devnetcloud.com/media/pyats/docs/async/multiprocessing.html

Kindly go through the above documentation for multiprocessing, try it

Thank you!

Sulray commented 5 months ago

Yes thanks but I've already seen this page and also some posts on stackoverflow but I still have my issue. Where does my case differ from the given example ? The example, with f a method and range(20) an iterable:

 with multiprocessing.Pool(2) as pool:
            result = pool.map(f, range(20))

My case, with self.runChecksDevice a method, self.devices an iterable: multiprocessing.Pool().map(self.runChecksDevice, self.devices)

Just in case, I also tried the following before sending my issue:

with multiprocessing.Pool(2) as pool:
            pool.map(self.runChecksDevice, self.devices) 
with multiprocessing.Pool() as pool: #supposed to use the maximum number of process possible
            pool.map(self.runChecksDevice, self.devices) 
Harishv01 commented 5 months ago

Okay, Kindly give me some time. I will check and let you know

Thank you

Harishv01 commented 5 months ago

The error message indicates that the name variable being passed to the runChecksDevice method is empty. This suggests that the self.name attribute might not be set correctly or is not accessible within the multiprocessing context. Please investigate and resolve this issue.

Additionally, could you provide information on how you are running the job? Are you passing any clean YAML while running?

Sulray commented 5 months ago

For the moment I've added some logs to check that the names are not empty. I checked first self.name (variable name in my class in my_operation.py) and then device.name (variable name in the class use₫ for devices) for each device (2 devices in my case):

2024-04-24 09:22:45,408 - package.my_operation - INFO - Name of operation: test_multiprocessing (my_operation.py:324)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is:  a1aaa01-clients (my_operation.py:326)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is:  a1aaa05 (my_operation.py:326)
2024-04-24 09:22:45,423 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for  a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-04-24 09:22:45,465 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for  a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
  File "main.py", line 74, in <module>
    operation.run()
....... (then it's the same traceback)

So i'm still investigating without really knowing why a "name" would be considered empty since these are the only variables "name" I defined.

About how I run the job, it all starts with main.py initializing other classes and calling methods on these classes. I do not use aetest so I do not use the "run(example.py)" method. I could show more of my code but it will be even more complex. The fact is that it was working before trying to implement the line with multiprocessing, so whatever the issue is, it only prevents multiprocessing to work.

And yes I'm generating/using a testbed.yaml with the devices I want to connect to inside. That's the only yaml I use.

devices:
  a1aaa01-clients:
    alias: a1aaa01-clients
    connections:
      rest:
        class: rest.connector.Rest
        ip: a1aaa01-clients
        port: 443
        protocol: https
    custom:
      abstraction:
        order:
        - os
    os: bigip
  a1aaa05:
    alias: a1aaa05
    connections:
      rest:
        class: rest.connector.Rest
        ip: a1aaa05
        port: 443
        protocol: https
    custom:
      abstraction:
        order:
        - os
    os: bigip
testbed:
  credentials:
    default:
      password: '%ASK{}'
      username: user_example
  name: test_multiprocessing
Harishv01 commented 5 months ago

Okay, please investigate and give me an update

Harishv01 commented 5 months ago

Hi, can you give me an update on the above?

Sulray commented 5 months ago

Hi, I haven't found out yet

Harishv01 commented 5 months ago

Can you share the debug working log from earlier when it was functioning properly? Also, could you please confirm if you are using a clean YAML file or a clean stage? Earlier, you mentioned a testbed YAML file, but I'm not referring to that.

Sulray commented 5 months ago

About the YAML, the testbed is the only yaml I'm using, I never tried a clean YAML file / clean stage or anything like that.

When it was functionning properly (with a for loop instead of multiprocessing) the logs about Retrieving stage '__getstate__' were not there.

When I use a for loop :

...
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f99b12aadc0>, <package.device.DeviceF5 object at 0x7f99b12aae50>]
2024-05-02 07:59:37,272 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)
2024-05-02 07:59:37,272 - package.my_operation - INFO - device.name in for loop: a1aaa01-clients (my_operation.py:334)
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Sending GET to 'a1aaa01-clients': https://10.10.10.10:443/mgmt/tm/sys/version (implementation.py:309)
...

When I use multiprocessing as detailed previously :

...
2024-05-02 08:02:45,315 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f643d9dfdf0>, <package.device.DeviceF5 object at 0x7f643d9dfe80>]
2024-05-02 08:02:45,315 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 08:02:45,315 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 08:02:45,316 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 08:02:45,336 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-05-02 08:02:45,377 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
  File "main.py", line 74, in <module>
    operation.run()
  File "/package/path/my_operation.py", line 252, in run
    self.runChecks()
  File "/package/path/my_operation.py", line 330, in runChecks
    multiprocessing.Pool().map(self.runChecksDevice, self.devices) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr__
  File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function
    raise Exception(f"The clean stage '{name}' does not exist in the json "
Exception: The clean stage '' does not exist in the json file

According to these logs, it seems like the method i'm trying to pass in multiprocessing (self.runChecksDevice) is not even running its first line that should state a line similar to the one in the for loop:

def runChecksDevice(self, device):
        logger.info(f"Starting makeChecks for device name {device.name}")
        device.makeChecks(self.phase,self.name)

So should log:

package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)

Harishv01 commented 5 months ago

Could you please share the code files.so that I can debug it?

Harishv01 commented 5 months ago

Could you please share the code files.so that I can debug it?

Harishv01 commented 5 months ago

please share the code files to debug the issue

Harishv01 commented 5 months ago

please share the code files to debug the issue

Sulray commented 5 months ago

I'll post an update on next monday/thursday, i'm currently not available I think i'll do a minimal example of my case because if I share all the current code it will increase the complexity

Harishv01 commented 4 months ago

Kindly share the entire code file to debug the issue

Sulray commented 4 months ago

I managed to do a minimal version which has the same exact output when trying to use multiprocessing. There are 4 python files. Here is the code (a lot of parts of it are not even linked to the issue but at least you have everything as expected):

my_main.py :

from package.my_operation import MyOperation
import logging
import sys
import os
from datetime import datetime

stream_handler = logging.StreamHandler(stream=sys.stdout)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(module)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)

logging.getLogger().addHandler(stream_handler)
logger = logging.getLogger(__name__)
logging.getLogger().setLevel(logging.DEBUG)

operation = MyOperation()
operation.initiate()

input_choice=''
while input_choice not in ['1','2','3','4']:
    input_choice = input('Which operation do you want to perform? \n1) Pre Checks \n2) Post Checks \n3) Diff \n4) Exit the CLI \nChoice (type number): ').lower()
datetime_start = datetime.now()
if input_choice == '1':
    operation.phase = "preChecks"
    operation.loadTestbed()
elif input_choice == '2':
    operation.phase = "postChecks"
    operation.loadTestbed()
elif input_choice == '3':
    operation.phase = "Diff"        

operation.run()

my_operation.py

from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
from package.utils.config_loader import config_loader
import pathlib
from package import genie_digger
import logging
import multiprocessing

logger = logging.getLogger(__name__)

class MyOperation:

    def __init__(self):
        self.name = ''
        self.username = ''
        self.devices:list[MyDevice] = []
        self.testbed = ''
        self.root_path = pathlib.Path(__file__).resolve().parents[1]
        self.operations_path = f"{self.root_path}/output/operations"
        self.testbeds_path = f"{self.root_path}/output/testbeds"
        self.phase = None

    def initiate(self):
        with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
            self.testbed = yaml.safe_load(testbed_file)

        self.name = self.testbed['testbed']['name']
        self.username = self.testbed['testbed']['credentials']['default']['username']

        for device in self.testbed['devices'].keys():                        
            if self.testbed['devices'][device]['os'] == 'bigip':                    
                self.devices.append(MyDeviceF5(device,self.operations_path))

    def run(self):
        logger.info("Starting run('%s')",self.phase)
        if not self.phase:
            raise Exception("No phase defined")
        self.buildTree()
        if self.phase=="Diff":
            self.runDiff()
        else: 
            self.runChecks()

    def buildTree(self):
        logger.info("Starting buildTree('%s')",self.phase)
        if not os.path.exists(f'{self.operations_path}/{self.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}')
        for device in self.devices:
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')

    def loadTestbed(self):
        try:
            self.testbed = load(self.testbed)
        except:
            print('host is not resolved')

    def runChecks(self):

        for device in self.devices:
                logger.info("name of device: %s",device.name)
                logger.info("self.testbed:\n%s",self.testbed)

                device.raiseSessionLimit(self.testbed , device.name)
                device.myConnect(self.testbed , device.name)

                if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                    if device.os not in ['bigip','fortios']:
                        device.getRunningConfig()
                        device.configAnalyzer(device.mapping_features , device.os)

                        print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                        for model in device.check_list:
                            print(f'- {model}')

                        not_checked_list = list(set(genie_digger.getFeatureList(device.os)) - set(device.check_list))

                        print(f'the following models have been ignored')
                        for model in not_checked_list:
                            print(f'- {model}')

                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))

                    else:
                        device.configAnalyzer()
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                else:
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
        print("self.devices: %s", str(self.devices))

        logger.info(f"Name of operation: {self.name}")
        for device in self.devices:
            logger.info(f"One device name is: {device.name}")

        # with multiprocessing.Pool(2) as pool:
        #     pool.map(self.runChecksDevice, self.devices) 

        multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

        for device in self.devices:
            logger.info(f"Starting makeChecks for device {device.name}")
            logger.info(f"device.name in for loop: {device.name}")
            # device.makeChecks(self.phase,self.name)
            device.restoreSessionLimit()
            device.pyats_device.disconnect()
            logger.info(f"Finished makeChecks and disconnected from device {device.name}")

    def runChecksDevice(self, device):
        logger.info(f"Starting makeChecks for device name {device.name}")
        device.makeChecks(self.phase,self.name)
        logger.info(f"Finished makeChecks and disconnected from device {device.name}")

    def runDiff(self):
        logging.debug("Running findDiff")
        for device in self.devices:
            logging.debug("runDiff on device %s", str(device.name))
            #Looping the list of uris
            if (device.check_list == []):
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
                logging.debug("device.checklist was empty, filled with check_list.json in device folder")
            logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
            for feature in device.check_list:
                logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
                pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                if os.path.isfile(f"{pre_checks_path}.txt"): 
                    logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                        diff = f_pre.read()
                elif os.path.isfile(f"{post_checks_path}.txt"):
                    logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                        diff = f_post.read()
                else:
                    try:
                        with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                            lines_pre = json.load(f_pre)
                        with open(f'{post_checks_path}.json' , 'r') as f_post:
                            lines_post = json.load(f_post)
                    except FileNotFoundError:
                        logger.error('file not found')
                        continue
                    logging.info("Opened the f_pre and f_post, now running Diff")
                    diff = Diff(lines_pre, lines_post)                    
                    logging.info("Running findDiff")
                    diff.findDiff()
                if len(str(diff)) > 0 :
                    with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                        f_diff.write(str(diff))
                    logging.info("Finished to write diff")

my_device.py

from abc import ABC , abstractmethod
from pyats.topology.loader import load

class MyDevice(ABC):
    def __init__(self,name,operations_path):
        self.name=name
        self.operations_path = operations_path

    @abstractmethod
    def makeChecks(self,phase,test_name):
        pass

    def myConnect(self,testbed,hostname):
        pass

    def raiseSessionLimit(self,testbed,hostname):
        pass

    def restoreSessionLimit(self):
        pass

my_device_F5.py

from package.my_device import MyDevice
import unicon
import json
from pyats.topology.loader import load
import icontrol
import logging 
import traceback

logger = logging.getLogger(__name__)

class MyDeviceF5(MyDevice):

    # Declare a list of uris which start by /mgmt/tm/sys
    uri_sys=[  
            '/mgmt/tm/sys/version',
            '/mgmt/tm/sys/hardware',
            '/mgmt/tm/sys/clock',
            '/mgmt/tm/sys/cpu',
            '/mgmt/tm/sys/license',
            '/mgmt/tm/sys/provision',
            '/mgmt/tm/sys/performance/connections/stats',
            '/mgmt/tm/sys/mcp-state/stats',
            '/mgmt/tm/sys/service/stats'
            ]

    # Declare a list of uris which start by /mgmt/tm/net
    uri_net=[  
            '/mgmt/tm/net/interface',
            '/mgmt/tm/net/trunk',
            '/mgmt/tm/net/vlan',
            '/mgmt/tm/net/self',
            '/mgmt/tm/net/arp',
            '/mgmt/tm/net/fdb',
            '/mgmt/tm/net/route',
            '/mgmt/tm/net/routing',
            '/mgmt/tm/net/interface/stats?$select=counters.dropsAll,counters.errorsAll,tmName,mediaActive,status',
            '/mgmt/tm/net/route-domain',
            '/mgmt/tm/net/routing/bgp'
            ]

    # Declare a list of uris which start by /mgmt/tm/gtm
    uri_gtm=[
            '/mgmt/tm/gtm/datacenter',
            '/mgmt/tm/gtm/server/',
            '/mgmt/tm/gtm/wideip/a',
            '/mgmt/tm/gtm/wideip/aaaa',
            '/mgmt/tm/gtm/pool/a',
            '/mgmt/tm/gtm/pool/aaaa',
            '/mgmt/tm/gtm/prober-pool',
            '/mgmt/tm/gtm/monitor',
            '/mgmt/tm/gtm/sync-status',
            '/mgmt/tm/gtm/iquery?$select=serverType,serverName,connectState',
            '/mgmt/tm/gtm/pool/a/stats?$select=status.availabilityState,status.enabledState,tmName',
            '/mgmt/tm/gtm/server/stats?$select=tmName,status.enabledState,status.availabilityState',
            '/mgmt/tm/gtm/datacenter/stats?$select=dcName,status.enabledState,status.availabilityState'
            ]

    # Declare a list of uris which start by /mgmt/tm/ltm
    uri_ltm=[
            '/mgmt/tm/ltm/virtual-address',
            '/mgmt/tm/ltm/virtual-address/stats?$select=tmName.description,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/virtual',
            '/mgmt/tm/ltm/virtual/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/snatpool',
            '/mgmt/tm/ltm/snat-translation',
            '/mgmt/tm/ltm/snat',
            '/mgmt/tm/ltm/rule',
            '/mgmt/tm/ltm/pool',
            '/mgmt/tm/ltm/pool/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/policy',
            '/mgmt/tm/ltm/node',
            '/mgmt/tm/ltm/node/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/monitor',
            '/mgmt/tm/ltm/nat',
            '/mgmt/tm/ltm/persistence'
    ]

    def __init__(self,name,operations_path,protocol='https',port='443'):
        super().__init__(name,operations_path)
        self.protocol = protocol
        self.port = port
        self.os = 'bigip'
        self.running_config=''
        self.check_list=[]
        self.pyats_device = None

    def makeChecks(self,phase,test_name):
        for uri in self.check_list: 
            file_extension = "json"
            try:
                output = self.pyats_device.rest.get( api_url=uri , verbose=True ).text
                output = json.loads(output)
                output = self.format_json(output)
                output = json.dumps(output , indent=4)
            except icontrol.exceptions.iControlUnexpectedHTTPError:
                logger.error(uri +' does not exist')
                output = traceback.format_exc()
                file_extension = "txt"

            try :
                with open(f'{self.operations_path}/{test_name}/{self.name}/{phase}/{(uri.replace("/", "_")).split("?" , )[0]}.{file_extension}' , 'w') as output_file:
                    output_file.write(output)
            except Exception as e:
                logger.error(f'learning {uri} failed!! Exception following lines:\n{e}')

    def myConnect(self,testbed,hostname):
        logger.info("testbed in:\n%s",testbed)

        device=testbed.devices[hostname]
        try:

            device.connect( via='rest' , alias='rest'  )
            device.rest.connected
            self.pyats_device = device
        except unicon.core.errors.ConnectionError as e:
            logger.error("-- ERROR --")
            logger.error(f"  The error following line:\n{e}")
            logger.error(f"  Can't connect to {device.alias}")
            device.disconnect()
            return
        except RuntimeError:
            device.connect( connection_timeout=15,  learn_hostname=True , log_stdout=True )
            self.pyats_device = device

    def configAnalyzer(self):
        output_ltm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/ltm' , verbose=True ).text
        output_gtm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/gtm' , verbose=True ).text

        if '"level":"nominal"' in output_ltm :
            is_ltm = True
        else:
            is_ltm = False

        if '"level":"nominal"' in output_gtm :
            is_dns = True
        else:
            is_dns = False

        if is_dns and not is_ltm:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_gtm
        elif is_ltm and not is_dns:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm
        elif is_dns and is_ltm:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm + MyDeviceF5.uri_gtm

    def format_json(self, json_dict):
        if ("items" in json_dict):
            for item in json_dict["items"]:
                    if ("fullPath" in item):
                        # item_copy = copy.deepcopy(item)
                        new_item = {"partition_" + item["fullPath"] : item}
                        index = json_dict["items"].index(item)
                        if index in json_dict["items"]:
                            logger.warning("Overwritting data in format_json on key %s in items", index)
                        json_dict["items"][index] = new_item
        return json_dict
Harishv01 commented 4 months ago

Thank you for sharing. Kindly give me some time to debug the issue.

Harishv01 commented 4 months ago

Can you share the working code files as well so that we can debug the issue?

Harishv01 commented 4 months ago

Can you share the working code files as well so that we can debug the issue?

Sulray commented 4 months ago

Just so you know I am aware of your comments and requests when you post them. I have other things to manage in parallel, sorry for each delay.

I realized that the error is different while using 24.4 (I was in 24.3 while having the error posted earlier: "clean stage '' does not exist in the json file") In 24.4 I have the following one:

024-05-16 12:44:21,106 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
2024-05-16 12:44:21,329 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
Traceback (most recent call last):
  File "my_main.py", line 31, in <module>
    operation.run()
  File "/package/path/package/my_operation.py", line 48, in run
    self.runChecks()
  File "/package/path/package/my_operation.py", line 109, in runChecks
    multiprocessing.Pool().map(self.runChecksDevice, self.devices) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

The only difference for a working code is in my_operation.py. I made a little update on this file to skip a dependancy which is not needed to test it on your side so here are the two versions of this script:

logger = logging.getLogger(name)

class MyOperation:

def __init__(self):
    self.name = ''
    self.username = ''
    self.devices:list[MyDevice] = []
    self.testbed = ''
    self.root_path = pathlib.Path(__file__).resolve().parents[1]
    self.operations_path = f"{self.root_path}/output/operations"
    self.testbeds_path = f"{self.root_path}/output/testbeds"
    self.phase = None

def initiate(self):
    with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
        self.testbed = yaml.safe_load(testbed_file)

    self.name = self.testbed['testbed']['name']
    self.username = self.testbed['testbed']['credentials']['default']['username']

    for device in self.testbed['devices'].keys():                        
        if self.testbed['devices'][device]['os'] == 'bigip':                    
            self.devices.append(MyDeviceF5(device,self.operations_path))

def run(self):
    datetime_start = datetime.now()
    logger.info("Starting run('%s')",self.phase)
    if not self.phase:
        raise Exception("No phase defined")
    self.buildTree()
    if self.phase=="Diff":
        self.runDiff()
    else: 
        self.runChecks()
    datetime_end = datetime.now()
    diff = datetime_end - datetime_start
    logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))

def buildTree(self):
    logger.info("Starting buildTree('%s')",self.phase)
    if not os.path.exists(f'{self.operations_path}/{self.name}'):
        os.makedirs(f'{self.operations_path}/{self.name}')
    for device in self.devices:
        if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
        if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
            os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')

def loadTestbed(self):
    try:
        self.testbed = load(self.testbed)
    except:
        print('host is not resolved')

def runChecks(self):
    for device in self.devices:
            logger.info("name of device: %s",device.name)
            logger.info("self.testbed:\n%s",self.testbed)

            device.raiseSessionLimit(self.testbed , device.name)
            device.myConnect(self.testbed , device.name)

            if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                if device.os not in ['bigip','fortios']:
                    device.getRunningConfig()
                    device.configAnalyzer(device.mapping_features , device.os)

                    print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                    for model in device.check_list:
                        print(f'- {model}')

                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                        check_list_file.write(str(json.dumps(device.check_list, indent=4)))

                else:
                    device.configAnalyzer()
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                        check_list_file.write(str(json.dumps(device.check_list, indent=4)))
            else:
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                    device.check_list = json.load(check_list_file)
    print("self.devices: %s", str(self.devices))

    logger.info(f"Name of operation: {self.name}")
    for device in self.devices:
        logger.info(f"One device name is: {device.name}")

    # with multiprocessing.Pool(2) as pool:
    #     pool.map(self.runChecksDevice, self.devices) 

    multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

    for device in self.devices:
        logger.info(f"Starting makeChecks for device {device.name}")
        logger.info(f"device.name in for loop: {device.name}")
        # device.makeChecks(self.phase,self.name)
        device.restoreSessionLimit()
        device.pyats_device.disconnect()
        logger.info(f"Finished makeChecks and disconnected from device {device.name}")

def runChecksDevice(self, device:MyDevice):
    logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
    device.makeChecks(self.phase,self.name)
    device.restoreSessionLimit()
    device.pyats_device.disconnect()
    logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")

def runDiff(self):
    logging.debug("Running findDiff")
    for device in self.devices:
        logging.debug("runDiff on device %s", str(device.name))
        #Looping the list of uris
        if (device.check_list == []):
            with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                    device.check_list = json.load(check_list_file)
            logging.debug("device.checklist was empty, filled with check_list.json in device folder")
        logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
        for feature in device.check_list:
            logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
            pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
            post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
            if os.path.isfile(f"{pre_checks_path}.txt"): 
                logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                    diff = f_pre.read()
            elif os.path.isfile(f"{post_checks_path}.txt"):
                logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                    diff = f_post.read()
            else:
                try:
                    with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                        lines_pre = json.load(f_pre)
                    with open(f'{post_checks_path}.json' , 'r') as f_post:
                        lines_post = json.load(f_post)
                except FileNotFoundError:
                    logger.error('file not found')
                    continue
                logging.info("Opened the f_pre and f_post, now running Diff")
                diff = Diff(lines_pre, lines_post)                    
                logging.info("Running findDiff")
                diff.findDiff()
            if len(str(diff)) > 0 :
                with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                    f_diff.write(str(diff))
                logging.info("Finished to write diff")

- Script my_operation.py using a for loop only (working): 

from genie.utils.diff import Diff import json import os import yaml from pyats.topology.loader import load from package.my_device import MyDevice from package.my_device_f5 import MyDeviceF5 import pathlib import logging import multiprocessing from datetime import datetime

logger = logging.getLogger(name)

class MyOperation:

def __init__(self):
    self.name = ''
    self.username = ''
    self.devices:list[MyDevice] = []
    self.testbed = ''
    self.root_path = pathlib.Path(__file__).resolve().parents[1]
    self.operations_path = f"{self.root_path}/output/operations"
    self.testbeds_path = f"{self.root_path}/output/testbeds"
    self.phase = None

def initiate(self):
    with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
        self.testbed = yaml.safe_load(testbed_file)

    self.name = self.testbed['testbed']['name']
    self.username = self.testbed['testbed']['credentials']['default']['username']

    for device in self.testbed['devices'].keys():                        
        if self.testbed['devices'][device]['os'] == 'bigip':                    
            self.devices.append(MyDeviceF5(device,self.operations_path))

def run(self):
    datetime_start = datetime.now()
    logger.info("Starting run('%s')",self.phase)
    if not self.phase:
        raise Exception("No phase defined")
    self.buildTree()
    if self.phase=="Diff":
        self.runDiff()
    else: 
        self.runChecks()
    datetime_end = datetime.now()
    diff = datetime_end - datetime_start
    logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))

def buildTree(self):
    logger.info("Starting buildTree('%s')",self.phase)
    if not os.path.exists(f'{self.operations_path}/{self.name}'):
        os.makedirs(f'{self.operations_path}/{self.name}')
    for device in self.devices:
        if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
        if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
            os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')

def loadTestbed(self):
    try:
        self.testbed = load(self.testbed)
    except:
        print('host is not resolved')

def runChecks(self):
    for device in self.devices:
            logger.info("name of device: %s",device.name)
            logger.info("self.testbed:\n%s",self.testbed)

            device.raiseSessionLimit(self.testbed , device.name)
            device.myConnect(self.testbed , device.name)

            if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                if device.os not in ['bigip','fortios']:
                    device.getRunningConfig()
                    device.configAnalyzer(device.mapping_features , device.os)

                    print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                    for model in device.check_list:
                        print(f'- {model}')

                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                        check_list_file.write(str(json.dumps(device.check_list, indent=4)))

                else:
                    device.configAnalyzer()
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                        check_list_file.write(str(json.dumps(device.check_list, indent=4)))
            else:
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                    device.check_list = json.load(check_list_file)
    print("self.devices: %s", str(self.devices))

    logger.info(f"Name of operation: {self.name}")
    for device in self.devices:
        logger.info(f"One device name is: {device.name}")

    # with multiprocessing.Pool(2) as pool:
    #     pool.map(self.runChecksDevice, self.devices) 

    # multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

    for device in self.devices:
        logger.info(f"Starting makeChecks for device {device.name}")
        logger.info(f"device.name in for loop: {device.name}")
        device.makeChecks(self.phase,self.name)
        device.restoreSessionLimit()
        device.pyats_device.disconnect()
        logger.info(f"Finished makeChecks and disconnected from device {device.name}")

# def runChecksDevice(self, device:MyDevice):
#     logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
#     device.makeChecks(self.phase,self.name)
#     device.restoreSessionLimit()
#     device.pyats_device.disconnect()
#     logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")

def runDiff(self):
    logging.debug("Running findDiff")
    for device in self.devices:
        logging.debug("runDiff on device %s", str(device.name))
        #Looping the list of uris
        if (device.check_list == []):
            with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                    device.check_list = json.load(check_list_file)
            logging.debug("device.checklist was empty, filled with check_list.json in device folder")
        logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
        for feature in device.check_list:
            logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
            pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
            post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
            if os.path.isfile(f"{pre_checks_path}.txt"): 
                logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                    diff = f_pre.read()
            elif os.path.isfile(f"{post_checks_path}.txt"):
                logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                    diff = f_post.read()
            else:
                try:
                    with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                        lines_pre = json.load(f_pre)
                    with open(f'{post_checks_path}.json' , 'r') as f_post:
                        lines_post = json.load(f_post)
                except FileNotFoundError:
                    logger.error('file not found')
                    continue
                logging.info("Opened the f_pre and f_post, now running Diff")
                diff = Diff(lines_pre, lines_post)                    
                logging.info("Running findDiff")
                diff.findDiff()
            if len(str(diff)) > 0 :
                with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                    f_diff.write(str(diff))
                logging.info("Finished to write diff")


You can just copy that in my_operation.py to try the (not) working version but you can see that the only difference in fact is at the end of runChecks method, I just commented different lines.

About the expected structure of files being used:

- my_main.py
- _package_ 
     -  my_device.py
     - my_device_f5.py
     - my_operation.py
- _output_
     - _testbeds_
          - testbed.yaml

I already sent you an example of testbed i'm using (just need to replace what has to be replaced such as hostnames and username).
Harishv01 commented 4 months ago

Okay, will check and let you know

Harishv01 commented 4 months ago

The error is not coming from the pyats. It is coming from your code. Kindly fix it. For your reference, kindly check this link: https://stackoverflow.com/questions/71945399/python-3-8-multiprocessing-typeerror-cannot-pickle-weakref-object

Sulray commented 4 months ago

I've seen this page during my own reasearch, but I never specify in my code that I use multiprocessing in "spawn" mode contrary to the examples here.

To verify that by default it's not spawn mode too, I added the following log in my my_main.py script: logger.warning("start_method: "+ str(multiprocessing.get_start_method())) The result: 2024-05-22 14:53:35,999 - __main__ - my_main - WARNING - start_method: fork

So I'm apparently in "fork" mode, which is not addressed in this stackoverflow issue.

I'm in python 3.8.10, pyats 24.4 during this log.

Harishv01 commented 4 months ago

The issue might be with the objects in the self.devices list. They might contain attributes that cannot be pickled check the pickle-ability of your MyDevice objects If your MyDevice class contains complex objects, try to simplify it. For instance, if it has a connection attribute, you might want to remove it or replace it with a simpler representation before passing it to the multiprocessing pool. Modify the runChecks method to handle multiprocessing correctly:

Sulray commented 4 months ago

I will try to see if I can get rid of complex objects. But do you know why the error is not the same in 24.3 ? ( "clean stage '' does not exist in the json file")

Harishv01 commented 4 months ago

Hi, In version 24.3, we seen some bugs, but we have fixed them in version 24.4. That's the reason why you haven't seen the same issue again.

Harishv01 commented 4 months ago

Have you fixed the issue and tried again? Can you give an update?

Sulray commented 4 months ago

Hi, I didn't find a way for the moment on the time I spend on it.

Do you think it could be related to the fact that the method i'm calling in multiprocessing is a class method ? So I need to call it by doing something like "multiprocessing.Pool().map(self.runChecksDevice, self.devices)" (with self.) to be able to use class variables.

I haven't found examples of this in the documentation, even though I think it's a common case.

Sulray commented 4 months ago

As a test I replaced the runChecks method of my script my_operation.py by the following:

def runChecks(self):
        def methodTest1(x:int):
            logger.info("x² is: %s", x**2)

        multiprocessing.Pool().map(methodTest1, range(10)) 

And I have the following error logs:

Traceback (most recent call last):
  File "my_main.py", line 42, in <module>
    operation.run()
  File "/package/path/package/my_operation.py", line 48, in run
    self.runChecks()
  File "/package/path/package/my_operation.py", line 112, in runChecks
    multiprocessing.Pool().map(methodTest1, range(10)) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'MyOperation.runChecks.<locals>.methodTest1'

But if I add global methodTest1 it's working...

So apparently it needs to be top level to be adressed in multiprocessing, so maybe that could explain why it's not working in my original case.... It could even be not related to a "complex object"

Harishv01 commented 4 months ago

The error is not coming from the pyats. It is coming from your code. Kindly fix it. For your reference, kindly check this link: https://stackoverflow.com/questions/72766345/attributeerror-cant-pickle-local-object-in-multiprocessing

Harishv01 commented 4 months ago

The error is not coming from the pyats. It is coming from your code. Kindly fix it. For your reference, kindly check this link: https://stackoverflow.com/questions/72766345/attributeerror-cant-pickle-local-object-in-multiprocessing

Harishv01 commented 4 months ago

Hi,

Can you please give me an update on the above?

Thank you.

Harishv01 commented 4 months ago

Can you please give me an update on the above?

Thank you.

Sulray commented 4 months ago

I'll get you updated when i'll have an update

Sulray commented 4 months ago

For the moment moved the method runChecksDevice from being a class method to a global method. All the arguments which were linked to "self" object are now new arguments leading to this: def runChecksDevice(device:MyDevice, testbed, operations_path, name, phase)

To use this global method in my class i've added this code:

        processes = []
        for device in self.devices:
            p = multiprocessing.Process(target=runChecksDevice,args=(device, self.testbed, self.operations_path, self.name, self.phase))
            processes.append(p)
        for process in processes:
            process.start()
        for process in processes:
            process.join()

and it seems to work correctly.

I don't know if it's the best way but at least it seems to work. Thank you for the support.

Harishv01 commented 4 months ago

After using the above method, this is now working. Hence, I am closing the ticket. Thank you.