kubernetes-client / python-base

Apache License 2.0
70 stars 185 forks source link

Watch stream should handle HTTP error before unmarshaling event #57

Closed salilgupta1 closed 4 years ago

salilgupta1 commented 6 years ago

I could be mistaken but looking at the infinite loop for the watch stream doesn't handle the case when you receive an event that is expired i.e. a HTTP status code of 410.

        while True:
            resp = func(*args, **kwargs)
            try:
                for line in iter_resp_lines(resp):
                    yield self.unmarshal_event(line, return_type)
                    if self._stop:
                        break
            finally:
                kwargs['resource_version'] = self.resource_version
                resp.close()
                resp.release_conn()

Looking at the code it seems that if the event is expired then resp should return something along the lines of

{'raw_object': {u'status': u'Failure', u'kind': u'Status', u'code': 410, u'apiVersion': u'v1', u'reason': u'Gone', u'message': u'too old resource version: 2428 (88826)', u'metadata': {}}, u'object': {'api_version': 'v1',
 'kind': 'Status',

And unmarshall_event should fail to deserialize the object and break. And self.resource_version should just be stuck on the resource_version of the event that was expired.

Am I missing something here?

Sturgelose commented 6 years ago

We are having exactly the issue the you described: our watch gets a 401, it disconnects and reconnects automatically ending in an undesired status.

Our case is:

  1. Etcd/K8s API returns us a 401 error with a resource_version = None in its body (as you can see in the snippet in this issue: https://github.com/kubernetes-client/python/issues/403). Reason of the error is that we are doing many calls to the API.
  2. Then, watch handles the event and restarts the connection. However, it saves resource_version value to the object, making it having a None value.
  3. As a result, the next request we are given a 500 error because resource_version in the request has a value of None and this is not allowed in K8s API.

Possible solutions:

Stack trace:

Exception in thread Thread-15:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "./main.py", line 109, in listen_k8s_changes
    resource_version=0):
  File "/usr/local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 122, in stream
    resp = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/apis/batch_v1_api.py", line 643, in list_namespaced_job
    (data) = self.list_namespaced_job_with_http_info(namespace, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/apis/batch_v1_api.py", line 746, in list_namespaced_job_with_http_info
    collection_formats=collection_formats)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 321, in call_api
    _return_http_data_only, collection_formats, _preload_content, _request_timeout)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 155, in __call_api
    _request_timeout=_request_timeout)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 342, in request
    headers=headers)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/rest.py", line 231, in GET
    query_params=query_params)
  File "/usr/local/lib/python3.6/site-packages/kubernetes/client/rest.py", line 222, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Tue, 15 May 2018 15:39:15 GMT', 'Content-Length': '186'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"resourceVersion: Invalid value: \\"None\\": strconv.ParseUint: parsing \\"None\\": invalid syntax","code":500}\n'

Note that the only place where resource_version is managed inside of the watch logic is in watch.py. In the rest/http logic, this field is not touched.

Which should be the option to fix this? I'd be willing to give a PR to fix the issue.

roycaihw commented 6 years ago

The current watch client code is dynamic and assumes that API server always responds with correct API object to deserialize. When API server responds with 410 in case where resource version is too old (e.g. https://github.com/kubernetes-client/python/issues/484#issuecomment-396775365) or 401 in @Sturgelose's case, the errors are not raised/handled but get "deserialized".

I would suggest that we should check if the response is error code before deserializing, and reason about the proper behavior for different error code.

rocky4570 commented 6 years ago

Yeah we have the same issue with timeout from AWS ELB, resulting in "Invalid value for spec, must not be None".

Was going to put in PR for unmarshal_event to raise TimeoutError so as to avoid too many changes affecting existing use cases:

Line 82 or kubernetes/watch/watch.py if js.get('type') == 'Error': if js.get('object', {}).get('reason') == 'Expired': raise TimeoutError(js{'object']['message'])

so that any logging is a little more clearer than potentially looking like its an issue elsewhere depending on what is being watched.

mitar commented 5 years ago

From API concepts:

A given Kubernetes server will only preserve a historical list of changes for a limited time. Clusters using etcd3 preserve changes in the last 5 minutes by default. When the requested watch operations fail because the historical version of that resource is not available, clients must handle the case by recognizing the status code 410 Gone, clearing their local cache, performing a list operation, and starting the watch from the resourceVersion returned by that new list operation. Most client libraries offer some form of standard tool for this logic. (In Go this is called a Reflector and is located in the k8s.io/client-go/cache package.)

It would be great if Python code could correctly support this automatically.

mitar commented 5 years ago

Based on #102 I made an improved version:

This is almost what API concepts describe with one important difference: API concepts and go library provide a function called "list and watch" which ties listing and watching together. This package (to my knowledge) does not provide that. Because of that we have to throw an exception if the latest event is too old. Otherwise we could just retry the whole "list and watch".

The problem with current approach (in my branch) is that if code was not able to resume automatically (because last event was too old) then just restarting watch with resource_version set to None is not good enough. It could happen that you just missed some event. In some cases this might be OK (if you do not need to capture all changes). But to my understanding, to not miss any event you should then do "list and watch" to recover. And this currently has to be done manually on the outside. We might want to provide such utility function covering all (edge) cases.

zw0610 commented 5 years ago

Is there any reason not adding except clause in the while loop within stream method, as:

        while True:
            resp = func(*args, **kwargs)
            try:
                for line in iter_resp_lines(resp):
                    yield self.unmarshal_event(line, return_type)
                    if self._stop:
                        break
            except Exception as e:
               pass
            finally:
                kwargs['resource_version'] = self.resource_version
                resp.close()
                resp.release_conn()

            if timeouts or self._stop:
                break
mitar commented 5 years ago

I do not think so. The only reason why exception is thrown there is because we access wrong fields in the data object. Also, what you made as a snippet above eats all and every exception, this is definitely not something you want to do.

mitar commented 5 years ago

I made #133 which should fix this for good.

fejta-bot commented 5 years ago

Issues go stale after 90d of inactivity. Mark the issue as fresh with /remove-lifecycle stale. Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta. /lifecycle stale

mitar commented 5 years ago

/remove-lifecycle stale

fejta-bot commented 4 years ago

Issues go stale after 90d of inactivity. Mark the issue as fresh with /remove-lifecycle stale. Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta. /lifecycle stale

mitar commented 4 years ago

/remove-lifecycle stale

fejta-bot commented 4 years ago

Issues go stale after 90d of inactivity. Mark the issue as fresh with /remove-lifecycle stale. Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta. /lifecycle stale

mitar commented 4 years ago

/remove-lifecycle stale

fejta-bot commented 4 years ago

Issues go stale after 90d of inactivity. Mark the issue as fresh with /remove-lifecycle stale. Stale issues rot after an additional 30d of inactivity and eventually close.

If this issue is safe to close now please do so with /close.

Send feedback to sig-testing, kubernetes/test-infra and/or fejta. /lifecycle stale

mitar commented 4 years ago

/remove-lifecycle stale