Open b-y-f opened 1 year ago
This looks like a bug in botocore
. Could you raise a ticket there?
they will not fix it, it's integration issue
They seem to be pointing towards aiobotocore
(or us) as being the issue.
This is the code snippet that seems to be failing for you.
import asyncio
from aiobotocore.session import get_session
async def run():
session = get_session()
async with session.create_client("iam") as iam:
async for page in iam.get_paginator("list_roles").paginate():
for role in page["Roles"]:
print(await iam.list_role_tags(RoleName=role["RoleName"]))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
Can you run this and see if you get the same error?
They seem to be pointing towards
aiobotocore
(or us) as being the issue.This is the code snippet that seems to be failing for you.
import asyncio from aiobotocore.session import get_session async def run(): session = get_session() async with session.create_client("iam") as iam: async for page in iam.get_paginator("list_roles").paginate(): for role in page["Roles"]: print(await iam.list_role_tags(RoleName=role["RoleName"])) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()
Can you run this and see if you get the same error?
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In [2], line 13
11 if __name__ == "__main__":
12 loop = asyncio.get_event_loop()
---> 13 loop.run_until_complete(run())
14 loop.close()
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/asyncio/base_events.py:625, in BaseEventLoop.run_until_complete(self, future)
614 """Run until the Future is done.
615
616 If the argument is a coroutine, it is wrapped in a Task.
(...)
622 Return the Future's result, or raise its exception.
623 """
624 self._check_closed()
--> 625 self._check_running()
627 new_task = not futures.isfuture(future)
628 future = tasks.ensure_future(future, loop=self)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/asyncio/base_events.py:584, in BaseEventLoop._check_running(self)
582 def _check_running(self):
583 if self.is_running():
--> 584 raise RuntimeError('This event loop is already running')
585 if events._get_running_loop() is not None:
586 raise RuntimeError(
587 'Cannot run the event loop while another loop is running')
RuntimeError: This event loop is already running
Where did you run that code? Seems like you already have some event loop going, are you using Ipython?
are you using Ipython
Yes, inside notebook
This is result from Ipython
<ipython-input-1-bfecc2c8c6bf>:12: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '69a1aa2d-5faa-4999-ab64-2b8fe58a3c2e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '69a1aa2d-5faa-4999-ab64-2b8fe58a3c2e', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:13 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '809deeb9-a4bf-4526-9f35-07dd1ce9d3b6', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '809deeb9-a4bf-4526-9f35-07dd1ce9d3b6', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:14 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'b3e7ca88-3bb6-42c9-9be5-504af39dad17', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b3e7ca88-3bb6-42c9-9be5-504af39dad17', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:15 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '724f1024-3674-42b3-9173-241d53aa5976', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '724f1024-3674-42b3-9173-241d53aa5976', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:15 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'b1c37c4e-c87f-42ab-bffb-5f765a2fccd4', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b1c37c4e-c87f-42ab-bffb-5f765a2fccd4', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:16 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '8e1be49f-c137-4add-bc4f-fcbdd7119f64', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '8e1be49f-c137-4add-bc4f-fcbdd7119f64', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:16 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'bd830aa1-924a-4e46-8798-ef16abd16492', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'bd830aa1-924a-4e46-8798-ef16abd16492', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:17 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'a212f0a0-77ea-412f-9052-957389a2393e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'a212f0a0-77ea-412f-9052-957389a2393e', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:17 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'e36039b0-952e-4cc6-897a-0d9f650158ac', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e36039b0-952e-4cc6-897a-0d9f650158ac', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:17 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': 'd30ac538-99b9-4731-81bd-804b5b563cf7', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd30ac538-99b9-4731-81bd-804b5b563cf7', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:18 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '27b0a601-76a8-4bd8-bcef-4284f58028c0', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '27b0a601-76a8-4bd8-bcef-4284f58028c0', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:19 GMT'}, 'RetryAttempts': 0}}
{'Tags': [], 'IsTruncated': False, 'ResponseMetadata': {'RequestId': '7c8f8513-0c2f-408a-ac20-d7e6791d3eb8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7c8f8513-0c2f-408a-ac20-d7e6791d3eb8', 'content-type': 'text/xml', 'content-length': '300', 'date': 'Wed, 30 Nov 2022 10:48:19 GMT'}, 'RetryAttempts': 0}}
Ok then you don't need to start the loop in the notebook.
from aiobotocore.session import get_session
session = get_session()
async with session.create_client("iam") as iam:
async for page in iam.get_paginator("list_roles").paginate():
for role in page["Roles"]:
print(await iam.list_role_tags(RoleName=role["RoleName"]))
Ok then you don't need to start the loop.
Same with https://github.com/dask/dask-cloudprovider/issues/392#issuecomment-1331960754
Ok but if you use FargateCluster
you're still seeing the error?
Ok but if you use
FargateCluster
you're still seeing the error?
In [4]: from dask_cloudprovider.aws import FargateCluster
...: cluster = FargateCluster(
...: image="daskdev/dask"
...: )
---------------------------------------------------------------------------
ParseError Traceback (most recent call last)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/botocore/parsers.py:503, in BaseXMLResponseParser._parse_xml_string_to_dom(self, xml_string)
502 parser.feed(xml_string)
--> 503 root = parser.close()
504 except XMLParseError as e:
ParseError: no element found: line 1, column 0
During handling of the above exception, another exception occurred:
ResponseParserError Traceback (most recent call last)
Cell In [4], line 2
1 from dask_cloudprovider.aws import FargateCluster
----> 2 cluster = FargateCluster(
3 image="daskdev/dask"
4 )
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/dask_cloudprovider/aws/ecs.py:1489, in FargateCluster.__init__(self, **kwargs)
1488 def __init__(self, **kwargs):
-> 1489 super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/dask_cloudprovider/aws/ecs.py:800, in ECSCluster.__init__(self, fargate_scheduler, fargate_workers, fargate_spot, image, scheduler_cpu, scheduler_mem, scheduler_port, scheduler_timeout, scheduler_extra_args, scheduler_task_definition_arn, scheduler_task_kwargs, scheduler_address, worker_cpu, worker_nthreads, worker_mem, worker_gpu, worker_extra_args, worker_task_definition_arn, worker_task_kwargs, n_workers, workers_name_start, workers_name_step, cluster_arn, cluster_name_template, execution_role_arn, task_role_arn, task_role_policies, cloudwatch_logs_group, cloudwatch_logs_stream_prefix, cloudwatch_logs_default_retention, vpc, subnets, security_groups, environment, tags, skip_cleanup, aws_access_key_id, aws_secret_access_key, region_name, platform_version, fargate_use_private_ip, mount_points, volumes, mount_volumes_on_scheduler, **kwargs)
798 self._lock = asyncio.Lock()
799 self.session = get_session()
--> 800 super().__init__(**kwargs)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/distributed/deploy/spec.py:275, in SpecCluster.__init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
273 if not called_from_running_loop:
274 self._loop_runner.start()
--> 275 self.sync(self._start)
276 try:
277 self.sync(self._correct_state)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
337 return future
338 else:
--> 339 return sync(
340 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
341 )
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
404 if error:
405 typ, exc, tb = error
--> 406 raise exc.with_traceback(tb)
407 else:
408 return result
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/distributed/utils.py:379, in sync.<locals>.f()
377 future = asyncio.wait_for(future, callback_timeout)
378 future = asyncio.ensure_future(future)
--> 379 result = yield future
380 except Exception:
381 error = sys.exc_info()
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
759 exc_info = None
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/dask_cloudprovider/aws/ecs.py:854, in ECSCluster._start(self)
852 # Cleanup any stale resources before we start
853 if not self._skip_cleanup:
--> 854 await _cleanup_stale_resources(
855 aws_access_key_id=self._aws_access_key_id,
856 aws_secret_access_key=self._aws_secret_access_key,
857 region_name=self._region_name,
858 )
860 if self.image is None:
861 if self._worker_gpu:
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/dask_cloudprovider/aws/ecs.py:1584, in _cleanup_stale_resources(**kwargs)
1580 for policy in attached_policies:
1581 await iam.detach_role_policy(
1582 RoleName=role["RoleName"], PolicyArn=policy["PolicyArn"]
1583 )
-> 1584 await iam.delete_role(RoleName=role["RoleName"])
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/aiobotocore/client.py:341, in AioBaseClient._make_api_call(self, operation_name, api_params)
339 else:
340 apply_request_checksum(request_dict)
--> 341 http, parsed_response = await self._make_request(
342 operation_model, request_dict, request_context
343 )
345 await self.meta.events.emit(
346 'after-call.{service_id}.{operation_name}'.format(
347 service_id=service_id, operation_name=operation_name
(...)
352 context=request_context,
353 )
355 if http.status_code >= 300:
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/aiobotocore/client.py:366, in AioBaseClient._make_request(self, operation_model, request_dict, request_context)
362 async def _make_request(
363 self, operation_model, request_dict, request_context
364 ):
365 try:
--> 366 return await self._endpoint.make_request(
367 operation_model, request_dict
368 )
369 except Exception as e:
370 await self.meta.events.emit(
371 'after-call-error.{service_id}.{operation_name}'.format(
372 service_id=self._service_model.service_id.hyphenize(),
(...)
376 context=request_context,
377 )
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/aiobotocore/endpoint.py:97, in AioEndpoint._send_request(self, request_dict, operation_model)
95 self._update_retries_context(context, attempts)
96 request = await self.create_request(request_dict, operation_model)
---> 97 success_response, exception = await self._get_response(
98 request, operation_model, context
99 )
100 while await self._needs_retry(
101 attempts,
102 operation_model,
(...)
105 exception,
106 ):
107 attempts += 1
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/aiobotocore/endpoint.py:139, in AioEndpoint._get_response(self, request, operation_model, context)
133 async def _get_response(self, request, operation_model, context):
134 # This will return a tuple of (success_response, exception)
135 # and success_response is itself a tuple of
136 # (http_response, parsed_dict).
137 # If an exception occurs then the success_response is None.
138 # If no exception occurs then exception is None.
--> 139 success_response, exception = await self._do_get_response(
140 request, operation_model, context
141 )
142 kwargs_to_emit = {
143 'response_dict': None,
144 'parsed_response': None,
145 'context': context,
146 'exception': exception,
147 }
148 if success_response is not None:
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/aiobotocore/endpoint.py:215, in AioEndpoint._do_get_response(self, request, operation_model, context)
211 parsed_response = await parser.parse(
212 response_dict, operation_model.output_shape
213 )
214 else:
--> 215 parsed_response = parser.parse(
216 response_dict, operation_model.output_shape
217 )
219 if http_response.status_code >= 300:
220 await self._add_modeled_error_fields(
221 response_dict,
222 parsed_response,
223 operation_model,
224 parser,
225 )
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/botocore/parsers.py:249, in ResponseParser.parse(self, response, shape)
247 return parsed
248 else:
--> 249 parsed = self._do_error_parse(response, shape)
250 else:
251 parsed = self._do_parse(response, shape)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/botocore/parsers.py:556, in QueryParser._do_error_parse(self, response, shape)
554 def _do_error_parse(self, response, shape):
555 xml_contents = response['body']
--> 556 root = self._parse_xml_string_to_dom(xml_contents)
557 parsed = self._build_name_to_xml_node(root)
558 self._replace_nodes(parsed)
File /opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/site-packages/botocore/parsers.py:505, in BaseXMLResponseParser._parse_xml_string_to_dom(self, xml_string)
503 root = parser.close()
504 except XMLParseError as e:
--> 505 raise ResponseParserError(
506 "Unable to parse response (%s), "
507 "invalid XML received. Further retries may succeed:\n%s"
508 % (e, xml_string)
509 )
510 return root
ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:
b''
In [5]:
Can you set skip_cleanup=True
in the FargateCluster
call?
Works!
In [1]: from dask_cloudprovider.aws import FargateCluster
...: cluster = FargateCluster(image="daskdev/dask", skip_cleanup=True )
/opt/homebrew/Caskroom/miniforge/base/envs/lake/lib/python3.10/contextlib.py:142: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources on AWS. Hang tight!
next(self.gen)
Hi @jacobtomlinson, But if I skip the clean_up, I have to manually delete the cluster.
No you don't. When you create a cluster it tries to clean up any old broken clusters that may exist, this is what you are skipping.
I'm getting the same error on an EC2 cluster. Any idea how to workaround that? The skip_cleanup argument isn't present for the EC2Cluster object.
In case it's helpful, it only happens when I increase the cluster size from 100 to 1000 machines
It sounds like AWS is giving some API response to botocore that it can't parse when making a large number of API requests. @secrettoad could you share the traceback so we can see which API calls in EC2Cluster
is causing it?
I will get this error sometimes before I start the cluster, but after a few times of retry, it finally succussed.