tomplus / kubernetes_asyncio

Python asynchronous client library for Kubernetes http://kubernetes.io/
Apache License 2.0
350 stars 70 forks source link

asyncio.TimeoutError when timeout_seconds is set on a Watch #259

Open rcaillon-Iliad opened 1 year ago

rcaillon-Iliad commented 1 year ago

When setting a timeout_seconds greater than 5 minutes, a TimeoutError is raised after 5 minutes.

import asyncio

from kubernetes_asyncio import client, config, watch

async def watch_pods():
    async with client.ApiClient() as api:
        v1 = client.CoreV1Api(api)
        async with watch.Watch().stream(v1.list_pod_for_all_namespaces, timeout_seconds=3600) as stream:
            async for event in stream:
                evt, obj = event['type'], event['object']
                print("{} pod {} in NS {}".format(evt, obj.metadata.name, obj.metadata.namespace))

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(config.load_kube_config())
    tasks = [
        asyncio.ensure_future(watch_pods()),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

if __name__ == '__main__':
    main()
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-2' coro=<watch_pods() done, defined at <stdin>:1> exception=TimeoutError()>
Traceback (most recent call last):
  File "<stdin>", line 5, in watch_pods
  File "/home/rcaillon/.local/lib/python3.10/site-packages/kubernetes_asyncio/watch/watch.py", line 131, in __anext__
    return await self.next()
  File "/home/rcaillon/.local/lib/python3.10/site-packages/kubernetes_asyncio/watch/watch.py", line 152, in next
    line = await self.resp.content.readline()
  File "/home/rcaillon/.local/lib/python3.10/site-packages/aiohttp/streams.py", line 311, in readline
    return await self.readuntil()
  File "/home/rcaillon/.local/lib/python3.10/site-packages/aiohttp/streams.py", line 343, in readuntil
    await self._wait("readuntil")
  File "/home/rcaillon/.local/lib/python3.10/site-packages/aiohttp/streams.py", line 303, in _wait
    with self._timer:
  File "/home/rcaillon/.local/lib/python3.10/site-packages/aiohttp/helpers.py", line 721, in __exit__
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
tomplus commented 1 year ago

What K8s cluster do you have? Is it provided by Google, AWS, ... or self-hosted?

rcaillon-Iliad commented 1 year ago

The cluster I use is provided by Scaleway (Kubernetes Kapsule)

ajinkya-takle commented 1 year ago

Even I am facing async timeout error when streaming logs from the pod. It times out exactly after 5 mins. Did you get any workaround to fix this? We are using AWS hosted K8. The kubectl can stream logs for an hour without any issues.

rcaillon-Iliad commented 1 year ago

I don't have any workaround unfortunately... still hoping for a fix

ajinkya-takle commented 1 year ago

Just sharing in case it helps somebody: We are not using watch.Watch().stream, simply using the raw API call & then streaming ahead to the client, here we set the timeout during the api call, it works in our case. eg:

resp = await client.read_namespaced_pod_log(
            pod,
            namespace,
            container=container,
            follow=True,
            _preload_content=False,
            timestamps=True,
            tail_lines=0,
            _request_timeout=3600
        )

This allows us to stream logs without any connection hiccups for an hour.

fighterhit commented 1 month ago

Hi @rcaillon-Iliad @ajinkya-takle , I met the same problem https://github.com/tomplus/kubernetes_asyncio/issues/325#issuecomment-2259998184 and you could try pass the _request_timeout to watch.Watch().stream like below

async def watch_endpoints():
    async with client.ApiClient() as api:
        v1 = client.CoreV1Api(api)
        async with watch.Watch().stream(v1.list_namespaced_endpoints, "XXX", _request_timeout=3600, timeout_seconds=3600) as stream:
            async for event in stream:
                evt, obj = event["type"], event["object"]
                ips = []
                if obj.subsets:
                    for ep in obj.subsets:
                        for addr in ep.addresses:
                            ips.append(addr.ip)
                    print(
                        "{} {}/{} endpoints {}".format(
                            evt, obj.metadata.namespace, obj.metadata.name, ips
                        )
                    )