candlepin / rho

ssh scanner for your network
GNU General Public License v2.0
32 stars 20 forks source link

Leaked paramiko ssh connection threads prevent rho from exiting #15

Open rrauenza opened 8 years ago

rrauenza commented 8 years ago

rho seems to get stuck scanning some ilo's that are on our network. If I watch lsof I can see the connections, and then they eventually disappear. I get these messages as they do:

2015-12-17 10:43:20,575 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428) 2015-12-17 10:43:21,670 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428) 2015-12-17 10:43:22,949 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428) 2015-12-17 10:43:24,369 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428) 2015-12-17 10:44:24,480 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428) 2015-12-17 10:44:25,398 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428)

And then nothing else happens. rho hangs indefnitely after that -- it doesn't seem to recognize that the ssh jobs have expired.

strace doesn't help -- it just shows all threads hanging on futex()

I think this is happening while rho is trying to login with different auth's.

rrauenza commented 8 years ago

What's odd ... is I think it gave up authenticating long before those errors:

2015-12-17 10:35:14,544 rho WARNING - Found ssh on 10.139.28.10:22, but no auths worked. (ssh_jobs/244)

jmrodri commented 8 years ago

Thanks for reporting the issue. So do you expect rho to be able to connect to these ilo's? I want to make sure I can some how recreate a scenario where rho tries but can't login and hangs.

1) What OS are you running rho from? 2) What version of paramiko are you using? 3) What version of rho are you using? 4) Can you run rho dumpconfig? don't paste it here as there will be sensitive information printed out like passwords etc. I'm more interested in how many auths and profiles it needed to scan. IF you could sanitize it that would be helpful.

rrauenza commented 8 years ago

As a baseline, I'm running CentOS 7. I've pulled the latest rho from github.

At the moment I'm stepping through a remote debugger with the network isolated to the single ilo. I've put a break point here:

     def run(self):
          while not self.quitting:
             try:
+                import rpdb; rpdb.set_trace()
                 # grab a "ssh_job" off the q
                 ssh_job = self.ssh_queue.get()
                 self.get_transport(ssh_job)

Right now it is looping through about 12 auths .. (which will all likely fail, and that is fine/expected.)

$ rpm -qi python-paramiko Name : python-paramiko Version : 1.15.1 Release : 1.el7

The config is basically a single ip at the moment (it was a /24) and there are about 15 auths to try, including an ssh key one as the final attempt. The others are just root/password.

rrauenza commented 8 years ago

I lost connection with the debugger. I'm running it alone (no debugger) as a single ip to see what happens.

rrauenza commented 8 years ago

While running, each connection uses two sockets / fds. I assume this is expected.

Every 2.0s: sudo lsof -p `pgrep rho` | grep TCP                                                                                                                          
    rho     9496 rich    3u  IPv4 471098       0t0     TCP dhcp113:48175->xxxxx-ilo:ssh (ESTABLISHED)
    rho     9496 rich    4u  IPv4 469734       0t0     TCP dhcp113:48172->xxxxx-ilo:ssh (ESTABLISHED)

Looks like a new pair is created for each auth request. It's taking a long time for each auth request, btw. I think the ILO is just slow to respond.

Ah, and it just finished.

2015-12-17 11:44:38,043 paramiko.transport DEBUG - starting thread (client mode): 0xe4b990L (transport/1428) 2015-12-17 11:44:38,185 paramiko.transport INFO - Connected (version 2.0, client RomSShell_4.62) (transport/1428) 2015-12-17 11:44:38,186 paramiko.transport DEBUG - kex algos:[u'diffie-hellman-group14-sha1', u'diffie-hellman-group1-sha1'] server key:[u'ssh-dss'] client encrypt:[u'aes256-cbc', u'aes192-cbc', u'aes128-cbc', u'3des-cbc'] server encrypt:[u'aes256-cbc', u'aes192-cbc', u'aes128-cbc', u'3des-cbc'] client mac:[u'hmac-sha1'] server mac:[u'hmac-sha1'] client compress:[u'none'] server compress:[u'none'] client lang:[u''] server lang:[u''] kex follows?False (transport/1428) 2015-12-17 11:44:38,186 paramiko.transport DEBUG - Ciphers agreed: local=aes128-cbc, remote=aes128-cbc (transport/1428) 2015-12-17 11:44:38,186 paramiko.transport DEBUG - using kex diffie-hellman-group14-sha1; server key type ssh-dss; cipher: local aes128-cbc, remote aes128-cbc; mac: local hmac-sha1, remote hmac-sha1; compression: local none, remote none (transport/1428) 2015-12-17 11:44:47,578 paramiko.transport DEBUG - Switch to new keys ... (transport/1428) 2015-12-17 11:44:47,588 paramiko.transport DEBUG - Adding ssh-dss host key for 10.139.28.10: 16f7a2dad98817eb046b7c482172d4d6 (transport/1428) 2015-12-17 11:44:47,624 paramiko.transport DEBUG - userauth is OK (transport/1428) 2015-12-17 11:45:36,877 paramiko.transport INFO - Authentication (password) failed. (transport/1428) 2015-12-17 11:45:36,925 paramiko.transport DEBUG - EOF in transport thread (transport/1428) 2015-12-17 11:45:36,944 rho ERROR - login failed (ssh_jobs/295) 2015-12-17 11:45:36,944 rho WARNING - Found ssh on 10.139.28.10:22, but no auths worked. (ssh_jobs/244) 2015-12-17 11:45:36,978 paramiko.transport DEBUG - EOF in transport thread (transport/1428)

and exited "successfully" -- meaning, no hang. So the hang only happens when we have more connections.

I'm going to try this http://stackoverflow.com/questions/132058/showing-the-stack-trace-from-a-running-python-application and running the full subnet again. See if I can get a stack trace when it hangs.

rrauenza commented 8 years ago

I end up entering a state where I get this output:

2015-12-17 12:23:42,335 rho WARNING - Found ssh on 10.139.28.7:22, but no auths worked. (ssh_jobs/244)

But I still have one socket connected per lsof -- same ip:

rho     13507 rich    5u  IPv4 585630       0t0     TCP dhcp113:51586->10.139.28.7:ssh (ESTABLISHED)

...which eventually disappears after a

2015-12-17 12:29:10,422 paramiko.transport INFO - Disconnect (code 2): Protocol Timeout (transport/1428)

But then rho doesn't exit.

I can't break in on SIGUSR1 to get the stack trace. If I use pstack on the pid, all threads are blocked on sem_wait().

rrauenza commented 8 years ago

I've added this to get periodic aggregated stack traces:

def stacktraces():                                                              
      print >> sys.stderr, "\n*** STACKTRACE - START ***\n"                       
      traces = collections.defaultdict(list)                                      
      for threadId, stack in sys._current_frames().items():                       
          code = []                                                               
          # code.append("\n# ThreadID: %s" % threadId)                            
          for filename, lineno, name, line in traceback.extract_stack(stack):     
              code.append('File: "%s", line %d, in %s' % (filename,               
                                                          lineno, name))          
              if line:                                                            
                  code.append("  %s" % (line.strip()))                            
          traces[tuple(code)].append("%s" % threadId)                             

      for k, v in traces.iteritems():                                             
          print >> sys.stderr, "\n# ThreadIDs: %s" % v                            
          for line in k:                                                          
              print >> sys.stderr, line                                           
      print >> sys.stderr, "\n*** STACKTRACE - END ***\n"                         

  def stacktrace_loop():                                                          
      import time                                                                 
      while True:                                                                 
          time.sleep(30)                                                          
          stacktraces()                                                           

  t = threading.Thread(target=stacktrace_loop)                                    
  t.setDaemon(True)                                                               
  t.start()                   

And upped self.max_threads to 256 to shorten the run time.

rrauenza commented 8 years ago

Ok, as it finishes, I do have threads blocked on sockets:

# ThreadIDs: ['139730493523712', '139730644592384', '139730527094528', '139730451560192']
File: "/usr/lib64/python2.7/threading.py", line 784, in __bootstrap
  self.__bootstrap_inner()
File: "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
  self.run()
File: "/usr/lib/python2.7/site-packages/paramiko/transport.py", line 1590, in run
  ptype, m = self.packetizer.read_message()
File: "/usr/lib/python2.7/site-packages/paramiko/packet.py", line 341, in read_message
  header = self.read_all(self.__block_size_in, check_rekey=True)
File: "/usr/lib/python2.7/site-packages/paramiko/packet.py", line 204, in read_all
  x = self.__socket.recv(n)

And then when I'm in the "finished but hung" state, I get the following. We're blocked on Queues... are we missing a message in self.all_tasks_done.wait()?

*** STACKTRACE - START ***

# ThreadIDs: ['139731685287680', '139730795661056', '139731425113856', '139731878319872', '139731164940032', '139730904766208', '139732658841344', '139732499379968', '139731718858496', '139732339918592', '139732180457216', '139732566521600', '139731458684672', '139732272776960', '139730938337024', '139731752429312', '139733246134016', '139731492255488', '139732297955072', '139731232081664', '139732616877824', '139732457416448', '139730971907840', '139733254526720', '139732138493696', '139731979032320', '139731786000128', '139733237741312', '139731525826304', '139732323133184', '139731265652480', '139733262919424', '139732029388544', '139731005478656', '139731198510848', '139731819570944', '139732734375680', '139731559397120', '139733271312128', '139732415452928', '139732255991552', '139731299223296', '139732096530176', '139731937068800', '139731039049472', '139731853141760', '139733220955904', '139731592967936', '139731332794112', '139731072620288', '139732692412160', '139732532950784', '139730812446464', '139732373489408', '139732214028032', '139731626538752', '139732843480832', '139731895105280', '139732062959360', '139731366364928', '139732054566656', '139730846017280', '139732851873536', '139732331525888', '139731660109568', '139732558128896', '139731399935744', '139732650448640', '139732490987264', '139731139761920', '139732860266240', '139732172064512', '139730879588096', '139731693680384', '139731433506560', '139732868658944', '139732289562368', '139731173332736', '139732465809152', '139730913158912', '139732767946496', '139731727251200', '139732608485120', '139732449023744', '139731467077376', '139732130100992', '139731970639616', '139731206903552', '139732020995840', '139730946729728', '139731760822016', '139731500648192', '139731240474368', '139732725982976', '139730980300544', '139732407060224', '139732247598848', '139731794392832', '139732088137472', '139731928676096', '139731534219008', '139731274045184', '139731106191104', '139731013871360', '139731827963648', '139732818302720', '139731567789824', '139732574914304', '139732684019456', '139732524558080', '139731307616000', '139732365096704', '139732205635328', '139731047442176', '139731886712576', '139731861534464', '139732583307008', '139731601360640', '139732549736192', '139731341186816', '139731081012992', '139732591699712', '139732801517312', '139730820839168', '139732642055936', '139732482594560', '139731634931456', '139732163671808', '139731374757632', '139732600092416', '139732281169664', '139731114583808', '139733229348608', '139730854409984', '139731668502272', '139731408328448', '139732759553792', '139730787268352', '139732440631040', '139732012603136', '139731962246912', '139732121708288', '139731702073088', '139731441899264', '139731181725440', '139730921551616', '139731735643904', '139732717590272', '139731475470080', '139732398667520', '139732239206144', '139731215296256', '139732079744768', '139731920283392', '139730955122432', '139731769214720', '139732809910016', '139731509040896', '139731248867072', '139730988693248', '139732675626752', '139732516165376', '139731802785536', '139732356704000', '139732197242624', '139731542611712', '139731450291968', '139732541343488', '139731282437888', '139731022264064', '139732826695424', '139731576182528', '139732793124608', '139731316008704', '139732633663232', '139732474201856', '139731055834880', '139732155279104', '13973  self.run()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 176, in run
  ssh_job = self.out_queue.get()
File: "/usr/lib64/python2.7/Queue.py", line 168, in get
  self.not_empty.wait()
File: "/usr/lib64/python2.7/threading.py", line 339, in wait
  waiter.acquire()

# ThreadIDs: ['139733503039232']
File: "/usr/lib64/python2.7/threading.py", line 784, in __bootstrap
  self.__bootstrap_inner()
File: "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
  self.run()
File: "/usr/lib64/python2.7/threading.py", line 764, in run
  self.__target(*self.__args, **self.__kwargs)
File: "rho.git/bin/rho", line 63, in stacktrace_loop
  stacktraces()
File: "rho.git/bin/rho", line 45, in stacktraces
  for filename, lineno, name, line in traceback.extract_stack(stack):

# ThreadIDs: ['139733494646528']
File: "/usr/lib64/python2.7/threading.py", line 784, in __bootstrap
  self.__bootstrap_inner()
File: "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
  self.run()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 204, in run
  prog_buf = self.prog_queue.get()
File: "/usr/lib64/python2.7/Queue.py", line 168, in get
  self.not_empty.wait()
File: "/usr/lib64/python2.7/threading.py", line 339, in wait
  waiter.acquire()

# ThreadIDs: ['139733737269056']
File: "rho.git/bin/rho", line 71, in <module>
  CLI().main()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/cli.py", line 97, in main
  cmd.main()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/clicommands.py", line 327, in main
  self._do_command()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/clicommands.py", line 568, in _do_command
  missing = self.scanner.scan_profiles(self.options.profiles)
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/scanner.py", line 97, in scan_profiles
  self._run_scan()
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/scanner.py", line 110, in _run_scan
  self.ssh_jobs.run_jobs(callback=self._callback)
File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 415, in run_jobs
  self.ssh_queue.join()
File: "/usr/lib64/python2.7/Queue.py", line 82, in join
  self.all_tasks_done.wait()
File: "/usr/lib64/python2.7/threading.py", line 339, in wait
  waiter.acquire()

*** STACKTRACE - END ***
rrauenza commented 8 years ago

Ah, Queue24 is written as a compatibility class... but I'm using python 2.7.x. I'm going to try running this with Queue24 just a direct derivation of Queue, with no overrides.

If that doesn't fix it, then I wonder if an exception is causing a missing task_done message on one of the queues.

edit: Changing the Queue class didn't seem to fix it, but I've left it as a direct derivation for now.

rrauenza commented 8 years ago

I've found an exception I think isn't caught. Do you realize PkgInfoParseException is derived from BaseException?

exception BaseException The base class for all built-in exceptions. It is not meant to be directly inherited by user-defined classes > (for that, use Exception). If str() or unicode() is called on an instance of this class, the representation of > the argument(s) to the instance are returned, or the empty string when there were no arguments.

This causes a thread to except all the way out since you only catch Exception:

Exception in thread rho_ssh_thread-131:                                         
Traceback (most recent call last):                                              
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner      
    self.run()                                                                  
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 357, in run
    self.get_transport(ssh_job)                                                 
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 342, in get_transport
    self.run_cmds(ssh_job)                                                      
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 328, in run_cmds
    rho_cmd.populate_data(output)                                               
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/rho_cmds.py", line 62, in populate_data
    self.parse_data()                                                           
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/rho_cmds.py", line 147, in parse_data
    installed_packages = [PkgInfo(line, "|") for line in self.cmd_results[0][0].splitlines()]
  File "/mts/home5/rich/redhat-scan/rho.git/src/rho/rho_cmds.py", line 548, in __init__
    raise PkgInfoParseException()                                               
PkgInfoParseException            
rrauenza commented 8 years ago

Changing the parent class of PkgInfoParseException from BaseException to Exception seems to have cleared it up.

edit: nope. More info coming.

rrauenza commented 8 years ago

There still seems to be a problem of the ssh threads hanging on recv(). I think they aren't getting shut down after the exception.

I've tried adding a close:

     def run(self):
         while not self.quitting:
             try:
                 # grab a "ssh_job" off the q
                 ssh_job = self.ssh_queue.get()
                 self.get_transport(ssh_job)
                 self.out_queue.put(ssh_job)
                 self.ssh_queue.task_done()
             except Exception as e:
                 log.error("Exception: %s" % e)
                 log.error(traceback.print_tb(sys.exc_info()[2]))
+                if self.ssh:
+                    self.ssh.close()
                 self.ssh_queue.task_done()

and


     def get_transport(self, ssh_job):
         if ssh_job.ip is "":
             return None

         try:
             self.connect(ssh_job)
             if not self.ssh:
                 return

             # there was a connection/auth failure
             if ssh_job.error:
                 return
             self.run_cmds(ssh_job)
             self.ssh.close()

         except Exception as e:
             log.error("Exception on %s: %s" % (ssh_job.ip, e))
             log.error(sys.exc_info())
             log.error(traceback.print_tb(sys.exc_info()[2]))
             ssh_job.connection_result = False
             ssh_job.command_output = e
+            if self.ssh:
+                self.ssh.close()

There might be a smarter way to deal with it, like always closing at the end of run(). Or in a "finally:" after the exception, maybe with the close itself also wrapped in a try block.

I think what is happening is that SshThread isn't recycling self.ssh -- so it never gets GC'ed on the last iteration if an error happens.

This causes the underlying paramiko thread to never finish, which prevents the script from exiting.

Another alternative might be to make the paramiko threads daemon threads... but I think it would be better to ensure self.ssh is always closed at the end of every iteration.

Those additional changes above, including the s/BaseException/Exception/ change seems to have improved my reliability further.

rrauenza commented 8 years ago

Couple more places that might need the connection closed:


                 # Implies we've found an SSH server listening:
                 except paramiko.AuthenticationException as e:
                     # Because we stop checking ports once we find one where ssh
                     # is listening, we can report the error message here and it
                     # will end up in the final report correctly:
                     err = _("login failed")
                     log.error(err)
                     ssh_job.error = err
                     found_port = port
+                    if self.ssh:
+                        self.ssh.close()
                     continue

                 # No route to host:
                 except socket.error as e:
                     log.warn("No route to host, skipping port: %s" % debug_str)
                     ssh_job.error = str(e)
                     break

                 # TODO: Hitting a live port that isn't ssh will result in
                 # paramiko.SSHException, do we need to handle this explicitly?

                 # Something else happened:
                 except Exception as detail:
                     log.warn("Connection error: %s - %s" % (debug_str,
                                                             str(detail)))
                     ssh_job.error = str(detail)
+                    if self.ssh:
+                        self.ssh.close()
                     continue
rrauenza commented 8 years ago

Still getting hung/orphan threads stuck on recv. run might also need to cleanup at the end:

     def run(self):
         while not self.quitting:
             try:
+                try:
+                    if self.ssh:
+                        self.ssh.close()
+                except Exception as e:
+                    log.error("Exceptions: %s" % e)
+                    log.error(sys.exc_info())
+                    log.error(traceback.print_tb(sys.exc_info()[2]))
                 # grab a "ssh_job" off the q
                 ssh_job = self.ssh_queue.get()
                 self.get_transport(ssh_job)
                 self.out_queue.put(ssh_job)
                 self.ssh_queue.task_done()
             except Exception as e:
                 log.error("Exception: %s" % e)
                 log.error(traceback.print_tb(sys.exc_info()[2]))
+                if self.ssh:
+                    self.ssh.close()
                 self.ssh_queue.task_done()
+        if self.ssh:  # Cleanup before exiting.
+            self.ssh.close()
rrauenza commented 8 years ago

The tool still isn't completing because some systems are still stuck in auth -- after 12 hours:

ThreadIDs: ['139837625333504', '139838028183296', '139838811657984', '139836517496576', '139837642118912', '139837482657536', '139837516228352'] File: "/usr/lib64/python2.7/threading.py", line 784, in bootstrap self.bootstrap_inner() File: "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner self.run() File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 374, in run self.get_transport(ssh_job) File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 343, in get_transport self.connect(ssh_job) File: "/mts/home5/rich/redhat-scan/rho.git/src/rho/ssh_jobs.py", line 288, in connect timeout=ssh_job.timeout) File: "/usr/lib/python2.7/site-packages/paramiko/client.py", line 307, in connect look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host) File: "/usr/lib/python2.7/site-packages/paramiko/client.py", line 510, in _auth self._transport.auth_password(username, password) File: "/usr/lib/python2.7/site-packages/paramiko/transport.py", line 1166, in auth_password return self.auth_handler.wait_for_response(my_event) File: "/usr/lib/python2.7/site-packages/paramiko/auth_handler.py", line 192, in wait_for_response event.wait(0.1) File: "/usr/lib64/python2.7/threading.py", line 621, in wait self.__cond.wait(timeout, balancing) File: "/usr/lib64/python2.7/threading.py", line 361, in wait _sleep(delay)

rrauenza commented 8 years ago

As I said, I found that some connections hung overnight on authentication. So as a workaround, I'm running rho against a single IP at a time (found via nmap) and rho wrapped with the Linux 'timeout' tool. I wrote a bash script to parallelize them.

This tool really needs some testing on more diverse/realistic networks/environments.