python-zk / kazoo

Kazoo is a high-level Python library that makes it easier to use Apache Zookeeper.
https://kazoo.readthedocs.io
Apache License 2.0
1.3k stars 387 forks source link

After trying to reconnect many times, client gets stuck #577

Open saffroy opened 5 years ago

saffroy commented 5 years ago

Hello,

I run the test program below, and after it connects to ZooKeeper, I block access to port 2181 with iptables. After a while (~90 sec) and a number of retries, the test program gets stuck: it blocks writing to a socketpair that seems full (it seems no thread reads from it).

Test program:

#!/usr/bin/env python3

import kazoo.client
import logging
import queue
import time

logging.basicConfig(level=logging.INFO)

# very low timeouts to reproduce the issue faster
CONNECT_TIMEOUT = 0.2
RETRY_TIMEOUT = 0.1
SESSION_TIMEOUT = 0.2

ZK_HOSTS = 'localhost:2181'

class MyClient:
    def __init__(self):
        self.zk = kazoo.client.KazooClient(hosts=ZK_HOSTS,
                                           timeout=SESSION_TIMEOUT)
        self.mqueue = queue.Queue()
        self.zk.add_listener(self.session_listener)
        self.zk_state = kazoo.client.KazooState.LOST
        self.running = False

    def session_listener(self, state):
        self.mqueue.put((self.update_state, (state,)))

    def update_state(self, state):
        logging.info('state {} -> {}'.format(self.zk_state, state))
        self.zk_state = state

    def reconnect(self):
        if self.zk_state == kazoo.client.KazooState.CONNECTED:
            return
        try:
            logging.info('connecting')
            if self.running:
                self.zk.stop()
                self.running = False
            self.zk.start(timeout=CONNECT_TIMEOUT)
            self.zk_state = kazoo.client.KazooState.CONNECTED
            self.running = True
            logging.info('connected')
        except Exception as e:
            logging.error(repr(e))

def main():
    cl = MyClient()

    while True:
        while not cl.mqueue.empty():
            f, args = cl.mqueue.get()
            f(*args)
        cl.reconnect()
        time.sleep(RETRY_TIMEOUT)

if __name__ == '__main__':
    main()

iptables script:

#!/bin/bash
set -x

case $1 in
    block)
    echo blocking
    for port in 218{1..3}; do
        iptables -t filter -A INPUT  -p tcp --sport $port -j DROP
        iptables -t filter -A OUTPUT -p tcp --dport $port -j DROP
    done
    ;;
    unblock)
    echo unblocking
    for port in 218{1..3}; do
        iptables -t filter -D INPUT  -p tcp --sport $port -j DROP
        iptables -t filter -D OUTPUT -p tcp --dport $port -j DROP
    done
    ;;
    *)
    echo "unknown command: $1"
    exit 1
    ;;
esac

I also run the script under python3-dbg so I can attach gdb to it, and obtain thread backtraces:

$ gdb python3-dbg $(pgrep python3-dbg )
...
(gdb) i threads
  Id   Target Id         Frame
* 1    Thread 0x7fd5da634740 (LWP 5426) "python3-dbg" 0x00007fd5da23995b in __libc_send (fd=8, buf=0x7fd5da477858, n=1, flags=-1) at ../sysdeps/unix/sysv/linux/x86_64/send.c:31
  2    Thread 0x7fd5d7238700 (LWP 6898) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
  3    Thread 0x7fd5d6236700 (LWP 6899) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
(gdb) thread apply all py-bt

Thread 3 (Thread 0x7fd5d6236700 (LWP 6899)):
Traceback (most recent call first):
  <built-in method acquire of _thread.lock object at remote 0x7fd5d74bcd50>
  File "/usr/lib/python3.4/threading.py", line 290, in wait
    waiter.acquire()
  File "/usr/lib/python3.4/queue.py", line 167, in get
    self.not_empty.wait()
  File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
    func = queue.get()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
    self._bootstrap_inner()

Thread 2 (Thread 0x7fd5d7238700 (LWP 6898)):
Traceback (most recent call first):
  <built-in method acquire of _thread.lock object at remote 0x7fd5d74bc8d8>
  File "/usr/lib/python3.4/threading.py", line 290, in wait
    waiter.acquire()
  File "/usr/lib/python3.4/queue.py", line 167, in get
    self.not_empty.wait()
  File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
    func = queue.get()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
    self._bootstrap_inner()

Thread 1 (Thread 0x7fd5da634740 (LWP 5426)):
Traceback (most recent call first):
  <built-in method send of socket object at remote 0x7fd5d724a178>
  File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 682, in stop
    self._connection._write_sock.send(b'\0')
  File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 630, in start
    self.stop()
  File "./test_31889.py", line 42, in reconnect
  File "./test_31889.py", line 56, in main
  File "./test_31889.py", line 60, in <module>

While the program runs, I monitor the state of its socketpairs with ss:

watch -n1 "ss -xp|awk 'NR==1 || /dbg/'"

When the program blocks, ss output always ends up in this state:

Netid  State      Recv-Q Send-Q   Local Address:Port       Peer Address:Port
u_str  ESTAB      0      213504               * 68192600               * 68192599 users:(("python3-dbg",5426,8))
u_str  ESTAB      278    0                    * 68192599               * 68192600 users:(("python3-dbg",5426,7))

Anything wrong in my program? Or is it a bug in Kazoo?

saffroy commented 5 years ago

If I change my code to always call zk.close() before zk.start() then the problem disappears: the socketpairs don't get clogged with unread bytes, as they are closed and re-created on every connection attempt.

I think the bug is that, on a failure, KazooClient.start() itself calls self.stop() but not self.close(). As we have multiple failed connection attemps, bytes accumulate in the ConnectionHandler socketpair, which eventually is full.

Adding this call to close() solves the problem for me, I'll do a PR.

BTW the problem is even easier to reproduce by trying to connecto to a non-existent ZK server (e.g. use localhost:12345).

jeffwidman commented 5 years ago

PR opened in #579