modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.89k stars 653 forks source link

Read compressed data from s3 in parallel #4565

Open alejandro-ponder opened 2 years ago

alejandro-ponder commented 2 years ago

Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. What kind of performance improvements would you like to see with this new API?

If I try to read compressed data (in my case gzip) from s3, modin doesn't read in parallel.

Can reproduce with the following:

pd.read_csv("https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")

mvashishtha commented 2 years ago

@alejandro-ponder thank you for reporting this issue! Modin is actually defaulting to pandas for a different reason: it thinks that the file doesn't exist. pandas can (start to-- I haven't been able to finish after several minutes) read the file with the HTTPS URL, but Modin doesn't allow that.

We are tracking support for reading data from an HTTPS URL in #3170.

But even when I change your path to the s3:// format, I get a different error ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" (shown below). The error originates here in FileDispatcher.file_exists. We need to investigate that. When I catch the ConnectTimeoutError there, I get another error when we actually open the file (second sack trace below) also ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token".

`ConnectTimeoutError` stack trace for reading from `s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz` ``` --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs) 985 async with ceil_timeout(timeout.sock_connect): --> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 987 except cert_errors as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave) 1049 try: -> 1050 sock = await self._connect_sock( 1051 exceptions, addrinfo, laddr_infos) 1052 break File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos) 960 raise my_exceptions.pop() --> 961 await self.sock_connect(sock, address) 962 return sock File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address) 498 self._sock_connect(fut, sock, address) --> 499 return await fut CancelledError: During handling of the above exception, another exception occurred: TimeoutError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 534 assert self._connector is not None --> 535 conn = await self._connector.connect( 536 req, traces=traces, timeout=real_timeout 537 ) 538 except asyncio.TimeoutError as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout) 541 try: --> 542 proto = await self._create_connection(req, traces, timeout) 543 if self._closed: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout) 906 else: --> 907 _, proto = await self._create_direct_connection(req, traces, timeout) 909 return proto File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error) 1174 try: -> 1175 transp, proto = await self._wrap_create_connection( 1176 self._factory, 1177 host, 1178 port, 1179 timeout=timeout, 1180 ssl=sslcontext, 1181 family=hinfo["family"], 1182 proto=hinfo["proto"], 1183 flags=hinfo["flags"], 1184 server_hostname=hinfo["hostname"] if sslcontext else None, 1185 local_addr=self._local_addr, 1186 req=req, 1187 client_error=client_error, 1188 ) 1189 except ClientConnectorError as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs) 985 async with ceil_timeout(timeout.sock_connect): --> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 987 except cert_errors as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb) 123 async def __aexit__( 124 self, 125 exc_type: Optional[Type[BaseException]], 126 exc_val: Optional[BaseException], 127 exc_tb: Optional[TracebackType], 128 ) -> Optional[bool]: --> 129 self._do_exit(exc_type) 130 return None File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type) 211 self._timeout_handler = None --> 212 raise asyncio.TimeoutError 213 # timeout has not expired TimeoutError: The above exception was the direct cause of the following exception: ServerTimeoutError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request) 177 url = URL(url, encoded=True) --> 178 response = await self._session.request( 179 request.method, url=url, headers=headers_, data=data, proxy=proxy_url, 180 proxy_headers=proxy_headers 181 ) 183 http_response = aiobotocore.awsrequest.AioAWSResponse( 184 str(response.url), 185 response.status, 186 response.headers, 187 response 188 ) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 538 except asyncio.TimeoutError as exc: --> 539 raise ServerTimeoutError( 540 "Connection timeout " "to host {}".format(url) 541 ) from exc 543 assert conn.transport is not None ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token During handling of the above exception, another exception occurred: ConnectTimeoutError Traceback (most recent call last) Input In [1], in () ----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t") File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options) 138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe()) 139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature} --> 140 return _read(**kwargs) File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs) 58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher 60 squeeze = kwargs.pop("squeeze", False) ---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs) 62 # This happens when `read_csv` returns a TextFileReader object for iterating through 63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader): File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs) 182 @classmethod 183 @_inherit_docstrings(factories.BaseFactory._read_csv) 184 def read_csv(cls, **kwargs): --> 185 return cls.__factory._read_csv(**kwargs) File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs) 209 @classmethod 210 @doc( 211 _doc_io_method_template, (...) 215 ) 216 def _read_csv(cls, **kwargs): --> 217 return cls.io_cls.read_csv(**kwargs) File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs) 129 @classmethod 130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO") 131 def read(cls, *args, **kwargs): 132 """ 133 Read data according passed `args` and `kwargs`. 134 (...) 151 postprocessing work on the resulting query_compiler object. 152 """ --> 153 query_compiler = cls._read(*args, **kwargs) 154 # TODO (devin-petersohn): Make this section more general for non-pandas kernel 155 # implementations. 156 if StorageFormat.get() == "Pandas": File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1009, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs) 1000 ( 1001 skiprows_md, 1002 pre_reading, 1003 skiprows_partitioning, 1004 ) = cls._manage_skiprows_parameter(skiprows, header_size) 1005 should_handle_skiprows = skiprows_md is not None and not isinstance( 1006 skiprows_md, int 1007 ) -> 1009 use_modin_impl = cls.check_parameters_support( 1010 filepath_or_buffer, 1011 kwargs, 1012 skiprows_md, 1013 header_size, 1014 ) 1015 if not use_modin_impl: 1016 return cls.single_worker_read( 1017 filepath_or_buffer, callback=cls.read_callback, **kwargs 1018 ) File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:656, in TextFileDispatcher.check_parameters_support(cls, filepath_or_buffer, read_kwargs, skiprows_md, header_size) 654 skiprows = read_kwargs.get("skiprows") 655 if isinstance(filepath_or_buffer, str): --> 656 if not cls.file_exists(filepath_or_buffer): 657 return False 658 elif not cls.pathlib_or_pypath(filepath_or_buffer): File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/file_dispatcher.py:271, in FileDispatcher.file_exists(cls, file_path) 269 exists = False 270 try: --> 271 exists = s3fs.exists(file_path) or exists 272 except NoCredentialsError: 273 pass File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper..wrapper(*args, **kwargs) 83 @functools.wraps(func) 84 def wrapper(*args, **kwargs): 85 self = obj or args[0] ---> 86 return sync(self.loop, func, *args, **kwargs) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs) 64 raise FSTimeoutError from return_result 65 elif isinstance(return_result, BaseException): ---> 66 raise return_result 67 else: 68 return return_result File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout) 24 coro = asyncio.wait_for(coro, timeout=timeout) 25 try: ---> 26 result[0] = await coro 27 except Exception as ex: 28 result[0] = ex File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:888, in S3FileSystem._exists(self, path) 886 return False 887 try: --> 888 await self._info(path, bucket, key, version_id=version_id) 889 return True 890 except FileNotFoundError: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id) 1138 if key: 1139 try: -> 1140 out = await self._call_s3( 1141 "head_object", 1142 self.kwargs, 1143 Bucket=bucket, 1144 Key=key, 1145 **version_id_kw(version_id), 1146 **self.req_kw, 1147 ) 1148 return { 1149 "ETag": out.get("ETag", ""), 1150 "LastModified": out["LastModified"], (...) 1156 "ContentType": out.get("ContentType"), 1157 } 1158 except FileNotFoundError: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs) 324 async def _call_s3(self, method, *akwarglist, **kwargs): --> 325 await self.set_session() 326 s3 = await self.get_s3(kwargs.get("Bucket")) 327 method = getattr(s3, method) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs) 469 else: 470 s3creator = self.session.create_client( 471 "s3", config=conf, **init_kwargs, **client_kwargs 472 ) --> 473 self._s3 = await s3creator.__aenter__() 475 self._s3creator = s3creator 476 # the following actually closes the aiohttp connection; use of privates 477 # might break in the future, would cause exception at gc time File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self) 21 async def __aenter__(self) -> AioBaseClient: ---> 22 self._client = await self._coro 23 return await self._client.__aenter__() File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config) 97 raise PartialCredentialsError( 98 provider='explicit', 99 cred_var=self._missing_cred_vars(aws_access_key_id, 100 aws_secret_access_key)) 101 else: --> 102 credentials = await self.get_credentials() 103 endpoint_resolver = self._get_internal_component('endpoint_resolver') 104 exceptions_factory = self._get_internal_component('exceptions_factory') File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self) 131 async def get_credentials(self): 132 if self._credentials is None: --> 133 self._credentials = await (self._components.get_component( 134 'credential_provider').load_credentials()) 135 return self._credentials File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self) 812 for provider in self.providers: 813 logger.debug("Looking for credentials via: %s", provider.METHOD) --> 814 creds = await provider.load() 815 if creds is not None: 816 return creds File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self) 484 async def load(self): 485 fetcher = self._role_fetcher --> 486 metadata = await fetcher.retrieve_iam_role_credentials() 487 if not metadata: 488 return None File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self) 173 async def retrieve_iam_role_credentials(self): 174 try: --> 175 token = await self._fetch_metadata_token() 176 role_name = await self._get_iam_role(token) 177 credentials = await self._get_credentials(role_name, token) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self) 86 for i in range(self._num_attempts): 87 try: ---> 88 response = await session.send(request.prepare()) 89 if response.status_code == 200: 90 return await response.text File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request) 208 except ServerTimeoutError as e: 209 if str(e).lower().startswith('connect'): --> 210 raise ConnectTimeoutError(endpoint_url=request.url, error=e) 211 else: 212 raise ReadTimeoutError(endpoint_url=request.url, error=e) ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" ```
Stack trace after catching ConnectionTimeoutError ``` --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs) 985 async with ceil_timeout(timeout.sock_connect): --> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 987 except cert_errors as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave) 1049 try: -> 1050 sock = await self._connect_sock( 1051 exceptions, addrinfo, laddr_infos) 1052 break File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos) 960 raise my_exceptions.pop() --> 961 await self.sock_connect(sock, address) 962 return sock File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address) 498 self._sock_connect(fut, sock, address) --> 499 return await fut CancelledError: During handling of the above exception, another exception occurred: TimeoutError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 534 assert self._connector is not None --> 535 conn = await self._connector.connect( 536 req, traces=traces, timeout=real_timeout 537 ) 538 except asyncio.TimeoutError as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout) 541 try: --> 542 proto = await self._create_connection(req, traces, timeout) 543 if self._closed: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout) 906 else: --> 907 _, proto = await self._create_direct_connection(req, traces, timeout) 909 return proto File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error) 1174 try: -> 1175 transp, proto = await self._wrap_create_connection( 1176 self._factory, 1177 host, 1178 port, 1179 timeout=timeout, 1180 ssl=sslcontext, 1181 family=hinfo["family"], 1182 proto=hinfo["proto"], 1183 flags=hinfo["flags"], 1184 server_hostname=hinfo["hostname"] if sslcontext else None, 1185 local_addr=self._local_addr, 1186 req=req, 1187 client_error=client_error, 1188 ) 1189 except ClientConnectorError as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs) 985 async with ceil_timeout(timeout.sock_connect): --> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 987 except cert_errors as exc: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb) 123 async def __aexit__( 124 self, 125 exc_type: Optional[Type[BaseException]], 126 exc_val: Optional[BaseException], 127 exc_tb: Optional[TracebackType], 128 ) -> Optional[bool]: --> 129 self._do_exit(exc_type) 130 return None File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type) 211 self._timeout_handler = None --> 212 raise asyncio.TimeoutError 213 # timeout has not expired TimeoutError: The above exception was the direct cause of the following exception: ServerTimeoutError Traceback (most recent call last) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request) 177 url = URL(url, encoded=True) --> 178 response = await self._session.request( 179 request.method, url=url, headers=headers_, data=data, proxy=proxy_url, 180 proxy_headers=proxy_headers 181 ) 183 http_response = aiobotocore.awsrequest.AioAWSResponse( 184 str(response.url), 185 response.status, 186 response.headers, 187 response 188 ) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 538 except asyncio.TimeoutError as exc: --> 539 raise ServerTimeoutError( 540 "Connection timeout " "to host {}".format(url) 541 ) from exc 543 assert conn.transport is not None ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token During handling of the above exception, another exception occurred: ConnectTimeoutError Traceback (most recent call last) Input In [1], in () ----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t") File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options) 138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe()) 139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature} --> 140 return _read(**kwargs) File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs) 58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher 60 squeeze = kwargs.pop("squeeze", False) ---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs) 62 # This happens when `read_csv` returns a TextFileReader object for iterating through 63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader): File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs) 182 @classmethod 183 @_inherit_docstrings(factories.BaseFactory._read_csv) 184 def read_csv(cls, **kwargs): --> 185 return cls.__factory._read_csv(**kwargs) File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs) 209 @classmethod 210 @doc( 211 _doc_io_method_template, (...) 215 ) 216 def _read_csv(cls, **kwargs): --> 217 return cls.io_cls.read_csv(**kwargs) File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs) 129 @classmethod 130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO") 131 def read(cls, *args, **kwargs): 132 """ 133 Read data according passed `args` and `kwargs`. 134 (...) 151 postprocessing work on the resulting query_compiler object. 152 """ --> 153 query_compiler = cls._read(*args, **kwargs) 154 # TODO (devin-petersohn): Make this section more general for non-pandas kernel 155 # implementations. 156 if StorageFormat.get() == "Pandas": File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator..decorator..run_and_log(*args, **kwargs) 50 """ 51 Compute function with logging if Modin logging is enabled. 52 (...) 62 Any 63 """ 64 if LogMode.get() == "disable": ---> 65 return f(*args, **kwargs) 67 logger = get_logger() 68 try: File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1027, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs) 1021 # In these cases we should pass additional metadata 1022 # to the workers to match pandas output 1023 pass_names = names in [None, lib.no_default] and ( 1024 skiprows is not None or kwargs["skipfooter"] != 0 1025 ) -> 1027 pd_df_metadata = cls.read_callback( 1028 filepath_or_buffer, 1029 **dict(kwargs, nrows=1, skipfooter=0, index_col=index_col), 1030 ) 1031 column_names = pd_df_metadata.columns 1032 column_widths, num_splits = cls._define_metadata(pd_df_metadata, column_names) File ~/software_sources/modin/modin/core/io/text/csv_dispatcher.py:40, in CSVDispatcher.read_callback(*args, **kwargs) 24 def read_callback(*args, **kwargs): 25 """ 26 Parse data on each partition. 27 (...) 38 Function call result. 39 """ ---> 40 return pandas.read_csv(*args, **kwargs) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments..decorate..wrapper(*args, **kwargs) 305 if len(args) > num_allow_args: 306 warnings.warn( 307 msg.format(arguments=arguments), 308 FutureWarning, 309 stacklevel=stacklevel, 310 ) --> 311 return func(*args, **kwargs) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:680, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options) 665 kwds_defaults = _refine_defaults_read( 666 dialect, 667 delimiter, (...) 676 defaults={"delimiter": ","}, 677 ) 678 kwds.update(kwds_defaults) --> 680 return _read(filepath_or_buffer, kwds) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:575, in _read(filepath_or_buffer, kwds) 572 _validate_names(kwds.get("names", None)) 574 # Create the parser. --> 575 parser = TextFileReader(filepath_or_buffer, **kwds) 577 if chunksize or iterator: 578 return parser File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:933, in TextFileReader.__init__(self, f, engine, **kwds) 930 self.options["has_index_names"] = kwds["has_index_names"] 932 self.handles: IOHandles | None = None --> 933 self._engine = self._make_engine(f, self.engine) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:1217, in TextFileReader._make_engine(self, f, engine) 1213 mode = "rb" 1214 # error: No overload variant of "get_handle" matches argument types 1215 # "Union[str, PathLike[str], ReadCsvBuffer[bytes], ReadCsvBuffer[str]]" 1216 # , "str", "bool", "Any", "Any", "Any", "Any", "Any" -> 1217 self.handles = get_handle( # type: ignore[call-overload] 1218 f, 1219 mode, 1220 encoding=self.options.get("encoding", None), 1221 compression=self.options.get("compression", None), 1222 memory_map=self.options.get("memory_map", False), 1223 is_text=is_text, 1224 errors=self.options.get("encoding_errors", "strict"), 1225 storage_options=self.options.get("storage_options", None), 1226 ) 1227 assert self.handles is not None 1228 f = self.handles.handle File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:670, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options) 667 codecs.lookup_error(errors) 669 # open URLs --> 670 ioargs = _get_filepath_or_buffer( 671 path_or_buf, 672 encoding=encoding, 673 compression=compression, 674 mode=mode, 675 storage_options=storage_options, 676 ) 678 handle = ioargs.filepath_or_buffer 679 handles: list[BaseBuffer] File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:385, in _get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode, storage_options) 382 pass 384 try: --> 385 file_obj = fsspec.open( 386 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 387 ).open() 388 # GH 34626 Reads from Public Buckets without Credentials needs anon=True 389 except tuple(err_types_to_retry_with_anon): File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:141, in OpenFile.open(self) 133 def open(self): 134 """Materialise this as a real open file without context 135 136 The file should be explicitly closed to avoid enclosed file (...) 139 been deleted; but a with-context is better style. 140 """ --> 141 out = self.__enter__() 142 closer = out.close 143 fobjects = self.fobjects.copy()[:-1] File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:104, in OpenFile.__enter__(self) 101 def __enter__(self): 102 mode = self.mode.replace("t", "").replace("b", "") + "b" --> 104 f = self.fs.open(self.path, mode=mode) 106 self.fobjects = [f] 108 if self.compression is not None: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1037, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs) 1035 else: 1036 ac = kwargs.pop("autocommit", not self._intrans) -> 1037 f = self._open( 1038 path, 1039 mode=mode, 1040 block_size=block_size, 1041 autocommit=ac, 1042 cache_options=cache_options, 1043 **kwargs, 1044 ) 1045 if compression is not None: 1046 from fsspec.compression import compr File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:605, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs) 602 if cache_type is None: 603 cache_type = self.default_cache_type --> 605 return S3File( 606 self, 607 path, 608 mode, 609 block_size=block_size, 610 acl=acl, 611 version_id=version_id, 612 fill_cache=fill_cache, 613 s3_additional_kwargs=kw, 614 cache_type=cache_type, 615 autocommit=autocommit, 616 requester_pays=requester_pays, 617 cache_options=cache_options, 618 ) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1911, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options) 1909 self.details = s3.info(path) 1910 self.version_id = self.details.get("VersionId") -> 1911 super().__init__( 1912 s3, 1913 path, 1914 mode, 1915 block_size, 1916 autocommit=autocommit, 1917 cache_type=cache_type, 1918 cache_options=cache_options, 1919 ) 1920 self.s3 = self.fs # compatibility 1922 # when not using autocommit we want to have transactional state to manage File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1385, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs) 1383 self.size = size 1384 else: -> 1385 self.size = self.details["size"] 1386 self.cache = caches[cache_type]( 1387 self.blocksize, self._fetch_range, self.size, **cache_options 1388 ) 1389 else: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1398, in AbstractBufferedFile.details(self) 1395 @property 1396 def details(self): 1397 if self._details is None: -> 1398 self._details = self.fs.info(self.path) 1399 return self._details File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper..wrapper(*args, **kwargs) 83 @functools.wraps(func) 84 def wrapper(*args, **kwargs): 85 self = obj or args[0] ---> 86 return sync(self.loop, func, *args, **kwargs) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs) 64 raise FSTimeoutError from return_result 65 elif isinstance(return_result, BaseException): ---> 66 raise return_result 67 else: 68 return return_result File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout) 24 coro = asyncio.wait_for(coro, timeout=timeout) 25 try: ---> 26 result[0] = await coro 27 except Exception as ex: 28 result[0] = ex File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id) 1138 if key: 1139 try: -> 1140 out = await self._call_s3( 1141 "head_object", 1142 self.kwargs, 1143 Bucket=bucket, 1144 Key=key, 1145 **version_id_kw(version_id), 1146 **self.req_kw, 1147 ) 1148 return { 1149 "ETag": out.get("ETag", ""), 1150 "LastModified": out["LastModified"], (...) 1156 "ContentType": out.get("ContentType"), 1157 } 1158 except FileNotFoundError: File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs) 324 async def _call_s3(self, method, *akwarglist, **kwargs): --> 325 await self.set_session() 326 s3 = await self.get_s3(kwargs.get("Bucket")) 327 method = getattr(s3, method) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs) 469 else: 470 s3creator = self.session.create_client( 471 "s3", config=conf, **init_kwargs, **client_kwargs 472 ) --> 473 self._s3 = await s3creator.__aenter__() 475 self._s3creator = s3creator 476 # the following actually closes the aiohttp connection; use of privates 477 # might break in the future, would cause exception at gc time File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self) 21 async def __aenter__(self) -> AioBaseClient: ---> 22 self._client = await self._coro 23 return await self._client.__aenter__() File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config) 97 raise PartialCredentialsError( 98 provider='explicit', 99 cred_var=self._missing_cred_vars(aws_access_key_id, 100 aws_secret_access_key)) 101 else: --> 102 credentials = await self.get_credentials() 103 endpoint_resolver = self._get_internal_component('endpoint_resolver') 104 exceptions_factory = self._get_internal_component('exceptions_factory') File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self) 131 async def get_credentials(self): 132 if self._credentials is None: --> 133 self._credentials = await (self._components.get_component( 134 'credential_provider').load_credentials()) 135 return self._credentials File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self) 812 for provider in self.providers: 813 logger.debug("Looking for credentials via: %s", provider.METHOD) --> 814 creds = await provider.load() 815 if creds is not None: 816 return creds File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self) 484 async def load(self): 485 fetcher = self._role_fetcher --> 486 metadata = await fetcher.retrieve_iam_role_credentials() 487 if not metadata: 488 return None File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self) 173 async def retrieve_iam_role_credentials(self): 174 try: --> 175 token = await self._fetch_metadata_token() 176 role_name = await self._get_iam_role(token) 177 credentials = await self._get_credentials(role_name, token) File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self) 86 for i in range(self._num_attempts): 87 try: ---> 88 response = await session.send(request.prepare()) 89 if response.status_code == 200: 90 return await response.text File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request) 208 except ServerTimeoutError as e: 209 if str(e).lower().startswith('connect'): --> 210 raise ConnectTimeoutError(endpoint_url=request.url, error=e) 211 else: 212 raise ReadTimeoutError(endpoint_url=request.url, error=e) ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" ```
mvashishtha commented 2 years ago

@alejandro-ponder never mind, that seems to be some kind of network error on my machine. Even pandas read import pandas as pd; pd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2009-01.csv", nrows=10) works on another machine but not on mine. Could you please try the s3:// path instead of http?

alejandro-ponder commented 2 years ago

Tried changing path to s3:// and the read takes about 7min 13 seconds, while it takes pandas about 3min 35 seconds for the same workload.

Note, i'm also using error_bad_lines=False argument. not sure if that could be affecting anything?

devin-petersohn commented 2 years ago

@alejandro-ponder is this in a fresh environment? Your machine may not have enough memory to do both Modin and pandas in the same notebook/interpreter environment.

alejandro-ponder commented 2 years ago

it should be. I restarted the kernel between the two runs

pyrito commented 2 years ago

@prutskov this issue should be handled now, right?

prutskov commented 2 years ago

Yes, now Modin handles https-like addresses. But I would prefer to run the reproducer before closing the issue.

vnlitvinov commented 2 years ago

@alejandro-ponder could you please re-check if this is still happening?