ktbyers / netmiko

Multi-vendor library to simplify Paramiko SSH connections to network devices
MIT License
3.57k stars 1.3k forks source link

hp_comware redispatch to cisco_iso_telnet #3276

Open hgqcs opened 1 year ago

hgqcs commented 1 year ago

hi ktbyers: i login device with hp_comware and redispatch with cisco_iso_telnet(i execute 'telnet IP' command to jump other device , they not the same prompt ), but it raise an readtimeout. i found the cisco_iso_telnet's session_preparation execute disable_paging before, and it execute method with read_until_prompt, it found the old prompt, so it raise readtimeout. i rewrite session_preparation, and change set_base_prompt before than the disable_paging, it ok.

ktbyers commented 1 year ago

@hgqcs You need to post your code and the exception stack trace.

hgqcs commented 1 year ago

@ktbyers my code:

from netmiko import ConnectHandler, redispatch
import copy
import inspect
import re
import time
from typing import (
    Callable,
    Any,
    TypeVar,
)

from netmiko.exceptions import (
    NetmikoAuthenticationException, )
from netmiko.utilities import m_exec_time  # noqa
# import logging
#
# logging.basicConfig(filename="/tmp/test.debug", level=logging.DEBUG)
# logger = logging.getLogger("netmiko")
# For decorators
F = TypeVar("F", bound=Callable[..., Any])

class JumpLogin:
    pri_prompt_terminator: str = r"\#\s*$"
    alt_prompt_terminator: str = r">\s*$"
    username_pattern: str = r"(?:user:|username|login|user name)"
    pwd_pattern: str = r"assword"
    TELNET_RETURN = "\r\n"
    RETURN = '\n'

    def __init__(self):
        """

        :param conn: a inited object
        :param kwargs: other arguments
        """
        self.conn = ''
        self.kwargs = ''
        self.return_msg = ''
        self.return_symbol = '\n'
        self.double_check_re_rule = ''
        self.answer_key = {
            'checkSavePublicKey': 1, 'checkAuthenticateState': 1, 'checkConfigDialog': 1, 'checkNonePassword': 1,
            'password': 1, 'username': 1, 'checkPublicKeyType': 1, 'checkNoPassword': 1
        }

    def joinResponseMsg(self, msg: str):
        """"""
        self.return_msg += msg

    def paushSecond(self, second: Any = 3):
        """"""
        time.sleep(second)

    def writeChannel(self, cmd: str):
        """
        send a command
        """
        return self.conn.write_channel(cmd)

    def readChannel(self):
        """

        """
        return self.conn.read_channel()

    def getLoopNumber(self, kwargs: dict):
        """"""
        outer_loops = kwargs.get('outer_loops', 3)
        return outer_loops

    def deleteOuterLoops(self, kwargs: dict):
        """"""
        outer_loops = 'outer_loops'
        if outer_loops in kwargs:
            del kwargs['outer_loops']

    def execCommonAction(self, second: Any = 1):
        """"""
        self.paushSecond(second)
        output = self.readChannel()
        self.joinResponseMsg(output)

    def enterJumpMode(self, kwargs):
        """"""

    def getDeviceType(self, link_type):
        """
        """

    def getLoginUsername(self, kwargs):
        """"""
        username = kwargs.get('username')
        if username is not None and username.strip():
            return True
        return False

    def checkBasePrompt(self, kwargs):
        """
        switch#telnet 10.30.34.254
        Trying 10.30.34.254 ... Open

        [Connection to 10.30.34.254 closed by foreign host]
        """
        if self.return_msg is None or not self.return_msg.strip():
            return False
        prompt = kwargs.get('prompt')
        last_line = self.return_msg.splitlines()[-1].strip()
        if prompt in last_line:
            return False
        return True

    def checkUsernameAndPassword(self, kwargs):
        """
        1.
            NT-CORE-3F#ssh  ip

            Password:

            ******************************************************************************
            * Copyright (c) 2004-2018 New H3C Technologies Co., Ltd. All rights reserved.*
            * Without the owner's prior written consent,                                 *
            * no decompiling or reverse-engineering shall be allowed.                    *
            ******************************************************************************

            <nt-3f-h3c-ip150>
        2.
        [admin@localhost ccb]$telnet ip
          Trying 10.30.0.60...
            Connected to 10.30.0.60.
            Escape character is '^]'.

            User Access Verification

            Password:
        3.
        [admin@localhost ccb]$telnet ip
        Trying 10.30.0.1...
        Connected to 10.30.0.1.
        Escape character is '^]'.

        User Access Verification

        Username:
        Password:
        4.
        ssh -l admin  ip
        Password:
        5.
        ssh  ip
        username: admin
        Password:
        """
        outer_loops = self.getLoopNumber(kwargs)
        self.execCommonAction(1)
        for _ in range(outer_loops):
            username_input_state = self.answer_key['username']
            password_input_state = self.answer_key['password']
            self.execCommonAction(1)
            state = self.checkBasePrompt(kwargs)
            if not state:
                continue
            user_re_search_result = re.search(self.username_pattern, self.return_msg, flags=re.I)
            password_re_search_result = re.search(self.pwd_pattern, self.return_msg, flags=re.I)
            # if response not found `password` symbol and not send username before
            if user_re_search_result and not password_re_search_result and username_input_state:
                self.answer_key['username'] = 0
                username = kwargs['username']
                self.writeChannel(f'{username}{self.return_symbol}')
                continue
            # if response found `password` symbol and not send password before
            if password_re_search_result and password_input_state:
                # Sometimes username/password must be terminated with "\r" and not "\r\n"
                password = kwargs.get('password')
                send_password = f'{password}{self.return_symbol}'
                self.writeChannel(send_password)
                self.answer_key['password'] = 0
                break
            self.maybeDisappear(kwargs)

    def checkAuthenticateState(self, kwargs):
        """"""
        name = inspect.currentframe().f_code.co_name
        symbol = 'Continue? [Y/N]:'
        # not repeat sending
        if symbol.lower() in self.return_msg.lower() and self.answer_key.get(name):
            self.writeChannel(f'Y{self.return_symbol}')
            self.answer_key[name] = 0

    def checkSavePublicKey(self, kwargs):
        """"""
        name = inspect.currentframe().f_code.co_name
        symbol = 'public key? [Y/N]:'
        # not repeat sending
        if symbol.lower() in self.return_msg.lower() and self.answer_key.get(name):
            self.writeChannel(f'Y{self.return_symbol}')
            self.answer_key[name] = 0

    def checkLoginState(self):
        """

        """
        if re.search(
                self.pri_prompt_terminator, self.return_msg, flags=re.M
        ) or re.search(self.alt_prompt_terminator, self.return_msg, flags=re.M):
            return True
        return False

    def checkConfigDialog(self):
        """"""
        self.execCommonAction(1)
        if re.search(
                r"initial configuration dialog\? \[yes/no\]: ", self.return_msg
        ):
            self.writeChannel("no" + self.return_symbol)
            self.paushSecond(0.1)
            count = 0
            while count < 15:
                output = self.readChannel()
                self.joinResponseMsg(output)
                if re.search(r"ress RETURN to get started", self.return_msg):
                    self.writeChannel(self.return_symbol)
                    break
                self.paushSecond(0.01)
                count += 1

    def checkNonePassword(self, kwargs: dict):
        """"""
        name = inspect.currentframe().f_code.co_name
        self.execCommonAction(1)
        if re.search(r"assword required, but none set", self.return_msg):
            host = kwargs['ip']
            msg = f"Login failed - Password required, but none set: {host}"
            self.answer_key[name] = 0
            raise NetmikoAuthenticationException(msg)

    def checkOtherResponse(self, kwargs: dict):
        """
        different brand's different output
        """
        self.checkAuthenticateState(kwargs)
        self.checkSavePublicKey(kwargs)
        self.checkJumpLoginStatus(kwargs)
        self.checkConfigDialog()
        self.checkNonePassword(kwargs)

    def checkAfterWritePassword(self):
        """
        input password and wait for response,
        if not response, not successful to jump login device
        """
        current_return_msg = copy.deepcopy(self.return_msg)
        for _ in range(10):
            self.execCommonAction(1)
            if current_return_msg != self.return_msg:
                break

    def checkJumpLoginStatus(self, kwargs):
        """

        """
        prompt = kwargs.get('prompt')
        if prompt is not None and self.return_msg:
            last_line = self.return_msg.splitlines()[-1]
            if prompt in last_line:
                raise NetmikoAuthenticationException('jump login failed')

    def loginDevice(self, kwargs):
        """
        check response and send message
         authenticated. Continue? [Y/N]
         server public key? [Y/N]

        """
        command = kwargs.get('jump_command')
        self.writeChannel(f'{command}{self.return_symbol}')
        self.checkUsernameAndPassword(kwargs)
        self.checkAfterWritePassword()
        self.checkOtherResponse(kwargs)
        login_state = self.checkLoginState()
        if not login_state:
            host = kwargs['ip']
            msg = f"Login failed: {host}"
            raise NetmikoAuthenticationException(msg)

    def checkPublicKeyType(self, kwargs):
        """"""
        self.execCommonAction()
        symbol = 'Please select [R, D, Enter or Ctrl_C]'
        # check for repect send
        if symbol.lower() in self.return_msg.lower():
            self.writeChannel(f'R{self.return_symbol}')

    def maybeDisappear(self, kwargs):
        """
        maybe disappear responses and auto send some messages
        """
        self.checkAuthenticateState(kwargs)
        self.checkSavePublicKey(kwargs)

    def jumpLogin(self):
        """

        """
        first_connect = {
            'ip': '172.18.202.150', 'password': 'admin', 'username': 'admin', 'device_type': 'hp_comware',
            'session_log': '/tmp/netmiko.log'
        }
        self.conn = ConnectHandler(**first_connect)
        prompt = self.conn.find_prompt()
        jump_command_info = {
            'jump_command': 'telnet 172.18.202.149', 'username': 'admin', 'password': 'admin',
            'secret': 'admin', 'outer_loops': 15, 'prompt': prompt, 'device_type': 'cisco_ios_telnet',
        }
        self.loginDevice(jump_command_info)
        redispatch(self.conn, jump_command_info['device_type'])

if __name__ == '__main__':
    test_run = JumpLogin()
    test_run.jumpLogin()

and error data:

Connected to pydev debugger (build 202.7660.27)
Traceback (most recent call last):
  File "/usr/local/python/lib/python3.8/site-packages/netmiko/base_connection.py", line 904, in _try_session_preparation
    self.session_preparation()
  File "/usr/local/python/lib/python3.8/site-packages/netmiko/cisco/cisco_ios.py", line 20, in session_preparation
    self.disable_paging()
  File "/usr/local/python/lib/python3.8/site-packages/netmiko/base_connection.py", line 1227, in disable_paging
    output = self.read_until_prompt()
  File "/usr/local/python/lib/python3.8/site-packages/netmiko/base_connection.py", line 755, in read_until_prompt
    return self.read_until_pattern(
  File "/usr/local/python/lib/python3.8/site-packages/netmiko/base_connection.py", line 672, in read_until_pattern
    raise ReadTimeout(msg)
netmiko.exceptions.ReadTimeout: 

Pattern not detected: 'nt\\-3f\\-h3c\\-ip150' in output.

Things you might try to fix this:
1. Adjust the regex pattern to better identify the terminating string. Note, in
many situations the pattern is automatically based on the network device's prompt.
2. Increase the read_timeout to a larger value.

You can also look at the Netmiko session_log or debug log for more information.

the session log:


******************************************************************************
* Copyright (c) 2004-2018 New H3C Technologies Co., Ltd. All rights reserved.*
* Without the owner's prior written consent,                                 *
* no decompiling or reverse-engineering shall be allowed.                    *
******************************************************************************

<nt-3f-h3c-ip150>
<nt-3f-h3c-ip150>
<nt-3f-h3c-ip150>
<nt-3f-h3c-ip150>screen-length disable
% Screen-length configuration is disabled for current user.
<nt-3f-h3c-ip150>
<nt-3f-h3c-ip150>telnet 172.18.202.149
Trying 172.18.202.149 ...
Press CTRL+K to abort
Connected to 172.18.202.149 ...

User Access Verification

Username: admin
Password: 
Switch>
Switch>terminal width 511
Switch>terminal length 0
Switch>
Switch>exit
ktbyers commented 1 year ago

@hgqcs In your code here:

        self.loginDevice(jump_command_info)
        redispatch(self.conn, jump_command_info['device_type'])

Set self.conn.global_cmd_verify=None...I think that should fix it. So new code:

        self.loginDevice(jump_command_info)
                self.conn.global_cmd_verify = None
        redispatch(self.conn, jump_command_info['device_type'])

Basically it is a weird interaction between the Netmiko object settings from HP_Comware that were persisting when you tried to switch to Cisco IOS.

hgqcs commented 1 year ago

@ktbyers yeah, try to add self.conn.global_cmd_verify = None and code it's ok. i check for cisco_asa, cisco_nxos and cisco_xr, they are the same with cisco_iso. some of cisco's session_preparation, the disable_paging and set_base_prompt 's order different from cisco_iso, other brand's session_preparation also change their's order, if change their order would better then set global_cmd_verify's value? other demo: cisco_iso -> hp_comware : ok cisco_iso -> cisco_iso(cisco_nxos) : ok cisco_iso -> ruijie_os: ok

thank you!