mvantellingen / python-zeep

A Python SOAP client
http://docs.python-zeep.org
Other
1.89k stars 587 forks source link

Repeatable ConnectionResetError #463

Closed alexdukemiller closed 7 years ago

alexdukemiller commented 7 years ago

Hello,

We are using a Python program as an Inmarsat multi-client server to handle inbound and outbound data.

Our issue is that while the code works initially, it consistently fails after a similar number of iterations for querying inbound messages (between 385 to 440 times) -- we've tried polling periods ranging from 10 sec to 2 minutes with the same results. We have also tried to run the code with zeep versions 1.5.0, 1.6.0, and 2.0.0 with the same results.

We have a simple test case which reliably reproduces the problem. This test case periodically queries the web service for inbound messages, and on the 441st iteration, the OS detects and raises a ConnectionResetError. Before the error is raised the querying thread appears to block for ~2.5 minutes on an invocation of GetReturnMessages(). The Skywave support team has assured us they are seeing nothing abnormal in our interaction with the web service. We therefore suspect that we are using the zeep interface incorrectly and wonder if you could provide some advice.

Also, through experimentation we discovered that resetting the zeep client before the error is detected allows the server to run continuously. So, presently, our code periodically (every 100 iterations) deletes the instance of zeep client we are using and creates a new one. In the attached test case, you can produce the error by setting RESET_COUNT to 10000. Unfortunately, it takes about 1.25 hours to produce the error because we cannot query the web service more frequently than once every 10 seconds.

We really appreciate any assistance you can provide for isolating the source of this problem.

Info:

  1. zeep version == 2.0.0
  2. WSDL == https://isatdatapro.skywave.com/GLGW/GWServices_v1/Messages.wsdl
  3. Example script with a SkyWave test account that can be freely used:

`

Configurable constants

INBOUND_RX_PERIOD = 10 # Period, in seconds, at which the modem gateway is polled for inbound messages (>10 sec) STATISTICS_PERIOD = 30 # Period, in seconds, at which statistics are printed RESET_COUNT = 100 # Number of iterations between resets of the zeep client

import socket import threading import time import zeep

class ZeepClient:

def __init__(self):

    # User account information for the http://isatdatapro.skywave.com gateway.
    # This data is taken from a SkyWave support email.
    self.account = "70000934"
    self.password = "password"
    self.modemid = "01097623SKY2C68"

    # Connect to the SkyWave server and initialize the SOAP interface.
    # Force HTTPS for the connection so username and password are not broadcast as plain text.
    print("SkwWave Server: https://isatdatapro.skywave.com/")
    self.client = zeep.Client('https://isatdatapro.swlab.ca:8143/GLGW/GWServices_v1/Messages.wsdl')

    # Get the SOAP type objects for everything we'll need.
    self.ForwardMessage_type = self.client.get_type('ns0:ForwardMessage')
    self.ArrayOfForwardMessage_type = self.client.get_type('ns0:ArrayOfForwardMessage')
    self.ReturnMessageFilter_type = self.client.get_type("ns0:ReturnMessageFilter")

def GetReturnMessages(self, highwater):
    # Get inbound messages from gateway and forward them to each registered client.

    ReturnMessageFilter = self.ReturnMessageFilter_type(StartUTC=highwater,
                                                   MobileID=self.modemid,
                                                   StartMessageID=0,
                                                   IncludeRawPayload=True,
                                                   IncludeType=False)

    result = self.client.service.GetReturnMessages(self.account, self.password, ReturnMessageFilter)

    print("Highwater: {}".format(highwater))

    # If there are new inbound messages from the gateway, send each one to each registered client
    if (result.Messages != None) and (result.ErrorID == 0):
        for rx_msg in result.Messages.ReturnMessage:
            self.handle_rx_msg(rx_msg)
    elif result.ErrorID != 0:
        print("Inbound message error:  {}".format(result.ErrorID))
    else:
        print("no rx messages")

    # Based on the server response, return the new "high watermark" for the next call.
    if (result.NextStartUTC == None):
        return highwater
    else:
        return result.NextStartUTC

def handle_rx_msg(self, msg):

    # Render the message payload as a hex string.
    raw_data = ''.join('{:02x} '.format(x) for x in msg.RawPayload)

    sin = msg.RawPayload[0]
    if (sin < 128):
        # Non-payload data that is not reported to mission control.  This is
        # typically a ping response, position response, or modem registration.
        # For now, use the Messenger application to decode the data.
        header = "<< RX  {:s}, {:s}, STATUS    :".format(msg.ReceiveUTC, msg.MobileID, sin)
        print(header, raw_data)
    else:
        # User payload data reported to mission control.  By our convention,
        # the SIN = 128 and MIN = 0 for all user payload data, and these
        # fields are not passed on to mission control.
        header = "<< RX  {:s}, {:s}, DATA[{:>4d}]:".format(msg.ReceiveUTC, msg.MobileID, len(msg.RawPayload))
        print(header, raw_data)

@classmethod
def reset_client(cls):
    global zeep_client
    del zeep_client.client
    del zeep_client
    zeep_client = ZeepClient()

class InboundMessages(threading.Thread): def init(self): threading.Thread.init(self) self.name = "InboundMessages"

def run(self):
    # Listen for inbound messages from modem and send them to each registered client.

    global zeep_client

    # Get the start time for the high water mark.
    highwater = zeep_client.client.service.InfoUTC()
    print("SkyWave Server Version:", zeep_client.client.service.InfoVersion())
    print("SkyWave Server UTC Time:", highwater)

    iteration = 1
    reset_counter = 1

    while True:
        # Do not poll any more frequently than once every 10 seconds.
        time.sleep(INBOUND_RX_PERIOD)

        # Check for new messages and process them.
        try:
            highwater = zeep_client.GetReturnMessages(highwater)
        except OSError as err:
            print("Inbound: OSError: {}, resetting zeep client".format(err))
            ZeepClient.reset_client()

        print("Iteration: {}.".format(iteration))
        timestamp()

        iteration = iteration + 1

        if reset_counter == RESET_COUNT:
            print("Inbound:  Proactively resetting zeep client.")
            ZeepClient.reset_client()
            reset_counter = 1
        else:
            reset_counter = reset_counter + 1

def timestamp(): ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time())) print("Current UTC time: {}".format(ts))

Create threads for the server.

inbound_thread = InboundMessages()

Create zeep client to connect with the Skywave gateway

zeep_client = ZeepClient()

Start all the threads.

inbound_thread.start()

Wait until someone kills the server

while True: time.sleep(STATISTICS_PERIOD) print("Main thread is alive.") timestamp()`


Here is the error messages we get in the terminal window upon failure when RESET_COUNT is 10000 on the 441st iteration of the code provided above:

Highwater: 2017-06-09 17:25:36
no rx messages
Iteration: 439
Main thread is alive.
Highwater: 2017-06-09 17:25:36
no rx messages
Iteration: 440
Main thread is alive.
Main thread is alive.
Main thread is alive.
Main thread is alive.
Main thread is alive.
Exception in thread InboundMessages:
Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 386, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 382, in _make_request
    httplib_response = conn.getresponse()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 1331, in getresponse
    response.begin()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\adapters.py", line 438, in send
    timeout=timeout
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 649, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\retry.py", line 357, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\packages\six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 386, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 382, in _make_request
    httplib_response = conn.getresponse()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 1331, in getresponse
    response.begin()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
requests.packages.urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "OC-test.py", line 113, in run
    highwater = zeep_client.GetReturnMessages(highwater)
  File "OC-test.py", line 51, in GetReturnMessages
    result = self.client.service.GetReturnMessages(self.account, self.password, ReturnMessageFilter)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\client.py", line 41, in __call__
    self._op_name, args, kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\wsdl\bindings\soap.py", line 112, in send
    options['address'], envelope, http_headers)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\transports.py", line 96, in post_xml
    return self.post(address, message, headers)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\transports.py", line 68, in post
    timeout=self.operation_timeout)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 565, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 518, in request
    resp = self.send(prep, **send_kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 639, in send
    r = adapter.send(request, **kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\adapters.py", line 488, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))`
mvantellingen commented 7 years ago

First guess is that this actually isn't a zeep issue but a requests / server config issue.

Couple of questions:

  1. Which requests version are you using?
  2. Which Python version is this?
  3. Does this problem only occur on windows? Did you try unix?
  4. Is there a http proxy between client and server?
alexdukemiller commented 7 years ago

Hello,

_1. The requests version I have been using is 2.14.2

  1. python version == Python 3.6.1
  2. I ran the code on Unix-based system (a Mac laptop) and had the same result as posted above
  3. There is no http proxy between client and server_

-Alex

mvantellingen commented 7 years ago

Reading the traceback it seems that requests expects that the connection is still open but it finds out that the remote server closed the connection.

This seems to be an issue outside of zeep, so I'm afraid I can't really offer any help here. You could try to pass a custom request session to zeep and set it to not use keep-alive? See http://docs.python-zeep.org/en/master/transport.html#transports and the requests documentation

alexdukemiller commented 7 years ago

Thank you Michael, I will definitely check out these resources. I appreciate you taking the time to look at the issue for me.

-Alex