Closed raybellwaves closed 4 years ago
This is the first report I've gotten of this error. Noted mdurant's suggestion, which seems the most likely explanation. Can I assume your filepath is formatted as: "abfs://{filesystem_name}/file.parquet"? Also, which Azure region are you in?
I was actually doing abfs://{filesystem_name}/file
but I updated my code (and the SO Q's) to be abfs://{filesystem_name}/file.parquet
. However, I still get the AzureHttpError
.
I'm in East US 2.
For reference, I'm working in East US 2 daily without issue, so I would assume it's not an availability problem. Can you answer a few other questions?
Thanks for the prompt for a MCVE.
- What package versions are you running? (adlfs, fsspec, dask, and azure-storage-blob).
Windows 10
adlfs==0.2.0, fsspec==0.6.2, dask==2.10.1, azure-storage-blob==2.1.0
further details below
- Are you running Dask locally or distributed? If distributed, what version.
distributed (2.10.1) using a LocalCluster
.
from dask.distributed import Client
client = Client()
- Is this parquet file one that was written to abfs with Dask? If no, does a simple read-write operation with another file work, and how was the existing parquet file created? If yes, does a read-write operation to-from CSV work successfully?
- Have you recreated the problem with a minimal working example (small example dummy dataframe)? If so can you share that example so I can try to re-create your issue?
Good questions. I tackle then both in the MCVE code.
I get EmptyDataError: No columns to parse from file
with the csv files and AzureHttpError: Server encountered an internal error
with the parquet file.
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
df = pd.DataFrame(data=d)
ddf = dd.from_pandas(df, npartitions=2)
STORAGE_OPTIONS={'account_name': 'ACCOUNT_NAME',
'account_key': 'ACCOUNT_KEY'}
# This works fine and I see the files in Microsoft Azure Storage Explorer
dd.to_csv(df=ddf,
filename='abfs://BLOB/FILE/*.csv',
storage_options=STORAGE_OPTIONS)
df = dd.read_csv('abfs://tmp/tmp2/*.csv', storage_options=STORAGE_OPTIONS)
---------------------------------------------------------------------------
EmptyDataError Traceback (most recent call last)
<ipython-input-33-4ef0af5e9369> in <module>
----> 1 df = dd.read_csv('abfs://tmp/tmp2/*.csv', storage_options=STORAGE_OPTIONS)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\csv.py in read(urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
576 storage_options=storage_options,
577 include_path_column=include_path_column,
--> 578 **kwargs
579 )
580
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\csv.py in read_pandas(reader, urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
442
443 # Use sample to infer dtypes and check for presence of include_path_column
--> 444 head = reader(BytesIO(b_sample), **kwargs)
445 if include_path_column and (include_path_column in head.columns):
446 raise ValueError(
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\parsers.py in parser_f(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, delim_whitespace, low_memory, memory_map, float_precision)
674 )
675
--> 676 return _read(filepath_or_buffer, kwds)
677
678 parser_f.__name__ = name
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\parsers.py in _read(filepath_or_buffer, kwds)
446
447 # Create the parser.
--> 448 parser = TextFileReader(fp_or_buf, **kwds)
449
450 if chunksize or iterator:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\parsers.py in __init__(self, f, engine, **kwds)
878 self.options["has_index_names"] = kwds["has_index_names"]
879
--> 880 self._make_engine(self.engine)
881
882 def close(self):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\parsers.py in _make_engine(self, engine)
1112 def _make_engine(self, engine="c"):
1113 if engine == "c":
-> 1114 self._engine = CParserWrapper(self.f, **self.options)
1115 else:
1116 if engine == "python":
~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandas\io\parsers.py in __init__(self, src, **kwds)
1889 kwds["usecols"] = self.usecols
1890
-> 1891 self._reader = parsers.TextReader(src, **kwds)
1892 self.unnamed_cols = self._reader.unnamed_cols
1893
pandas\_libs\parsers.pyx in pandas._libs.parsers.TextReader.__cinit__()
EmptyDataError: No columns to parse from file
# This works and I see it in Microsoft Azure Storage Explorer
dd.to_parquet(df=df,
path='abfs://BLOB/FILE.parquet',
storage_options=STORAGE_OPTIONS)
df = dd.read_parquet('abfs://tmp/tmp.parquet',
storage_options=STORAGE_OPTIONS)
ERROR:azure.storage.common.storageclient:Client-Request-ID=fe8a8c36-8120-11ea-a33c-a0afbd853445 Retry policy did not allow for a retry: Server-Timestamp=Sat, 18 Apr 2020 03:03:08 GMT, Server-Request-ID=a5160140-d01e-006b-642d-1518c8000000, HTTP status code=500, Exception=Server encountered an internal error. Please try again after some time. ErrorCode: InternalError<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.RequestId:a5160140-d01e-006b-642d-1518c8000000Time:2020-04-18T03:03:09.2047334Z</Message></Error>.
AzureHttpError Traceback (most recent call last)
<ipython-input-35-0b3e24138208> in <module>
1 df = dd.read_parquet('abfs://tmp/tmp.parquet',
----> 2 storage_options=STORAGE_OPTIONS)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, chunksize, **kwargs)
231 filters=filters,
232 split_row_groups=split_row_groups,
--> 233 **kwargs
234 )
235 if meta.index.name is not None:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in read_metadata(fs, paths, categories, index, gather_statistics, filters, **kwargs)
176 # correspond to a row group (populated below).
177 parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
--> 178 fs, paths, gather_statistics, **kwargs
179 )
180
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
127 open_with=fs.open,
128 sep=fs.sep,
--> 129 **kwargs.get("file", {})
130 )
131 if gather_statistics is None:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fastparquet\api.py in __init__(self, fn, verify, open_with, root, sep)
109 fn2 = join_path(fn, '_metadata')
110 self.fn = fn2
--> 111 with open_with(fn2, 'rb') as f:
112 self._parse_header(f, verify)
113 fn = fn2
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
722 autocommit=ac,
723 cache_options=cache_options,
--> 724 **kwargs
725 )
726 if not ac:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _open(self, path, mode, block_size, autocommit, cache_options, **kwargs)
552 autocommit=autocommit,
553 cache_options=cache_options,
--> 554 **kwargs,
555 )
556
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
582 cache_type=cache_type,
583 cache_options=cache_options,
--> 584 **kwargs,
585 )
586
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
954 if mode == "rb":
955 if not hasattr(self, "details"):
--> 956 self.details = fs.info(path)
957 self.size = self.details["size"]
958 self.cache = caches[cache_type](
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in info(self, path, **kwargs)
499 if out:
500 return out[0]
--> 501 out = self.ls(path, detail=True, **kwargs)
502 path = path.rstrip("/")
503 out1 = [o for o in out if o["name"].rstrip("/") == path]
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in ls(self, path, detail, invalidate_cache, delimiter, **kwargs)
446 # then return the contents
447 elif self._matches(
--> 448 container_name, path, as_directory=True, delimiter=delimiter
449 ):
450 logging.debug(f"{path} appears to be a directory")
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _matches(self, container_name, path, as_directory, delimiter)
386 prefix=path,
387 delimiter=delimiter,
--> 388 num_results=None,
389 )
390
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in list_blob_names(self, container_name, prefix, num_results, include, delimiter, marker, timeout)
1360 '_context': operation_context,
1361 '_converter': _convert_xml_to_blob_name_list}
-> 1362 resp = self._list_blobs(*args, **kwargs)
1363
1364 return ListGenerator(resp, self._list_blobs, args, kwargs)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in _list_blobs(self, container_name, prefix, marker, max_results, include, delimiter, timeout, _context, _converter)
1435 }
1436
-> 1437 return self._perform_request(request, _converter, operation_context=_context)
1438
1439 def get_blob_account_information(self, container_name=None, blob_name=None, timeout=None):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
444 status_code,
445 exception_str_in_one_line)
--> 446 raise ex
447 finally:
448 # If this is a location locked operation and the location is not set,
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
372 except AzureException as ex:
373 retry_context.exception = ex
--> 374 raise ex
375 except Exception as ex:
376 retry_context.exception = ex
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
358 # and raised as an azure http exception
359 _http_error_handler(
--> 360 HTTPError(response.status, response.message, response.headers, response.body))
361
362 # Parse the response
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\_error.py in _http_error_handler(http_error)
113 ex.error_code = error_code
114
--> 115 raise ex
116
117
AzureHttpError: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError
<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.
RequestId:a5160140-d01e-006b-642d-1518c8000000
Time:2020-04-18T03:03:09.2047334Z</Message></Error>
I've just attempted to reproduce your example, but it worked on my end. Below is my code and results:
import pandas as pd
import dask.dataframe as dd
from distributed import Client
client = Client()
storage_options = <DEFINED>
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
df = pd.DataFrame(data=d)
ddf = dd.from_pandas(df, npartitions=2)
dd.to_csv(df=ddf,
filename='abfs://<container>/test_csvfile/*.csv',
storage_options=storage_options)
df2 = dd.read_csv("abfs://datascience-dev/test_csvfile/*.csv", storage_options=storage_options)
df2.head() <returns successfully in Jupyter Notebook>
dd.to_parquet(ddf,
'abfs://datascience-dev/testfile.parquet',
storage_options=storage_options)
df3 = dd.read_parquet("abfs://datascience-dev/testfile.parquet",
storage_options=storage_options)
df3.head() <returns successfully in Jupyter Notebook>
This was run on Linux with Anaconda Python. Python v3.6.7. Confirmed it works on my Windows 10 as well.
Versions of adlfs, fsspec, azure-storage-blob == 2.1.0, azure-common==1.1.24, and azure-datalake-store==0.0.48. I see that you have azure-core installed, which I do not have installed, and is not a dependency. You may want to try removing. Looking through other packages that are logical suspects, I also have requests 2.23 rather than 2.22.
I will investigate further later today.
Thanks a lot for running. Regarding the packages I’ll try in a new env
Here's a new environment. Slightly different error message but same thing along the lines of file(s) not found?
Create new env:
> conda create -n adlfs python=3.8
> conda activate adlfs
> pip install adlfs
> conda install -c conda-forge dask fastparquet ipython
Check packages:
Setup code:
import pandas as pd
import dask.dataframe as dd
from distributed import Client
client = Client()
storage_options = <DEFINED>
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
df = pd.DataFrame(data=d)
ddf = dd.from_pandas(df, npartitions=2)
dd.to_csv(df=ddf,
filename='abfs://<container>/test_csvfile/*.csv',
storage_options=storage_options)
df2 = dd.read_csv('abfs://<container>/test_csvfile/*.csv',
storage_options=storage_options)
Error message:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\csv.py", line 566, in read
return read_pandas(
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\csv.py", line 398, in read_pandas
b_out = read_bytes(
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\bytes\core.py", line 96, in read_bytes
raise IOError("%s resolved to no files" % urlpath)
OSError: abfs://<container>/test_csvfile/*.csv resolved to no files
Print a few things using %debug:
ipdb> urlpath
'abfs://tmp/test_csvfile/*.csv'
ipdb> paths
[]
ipdb> b_lineterminator
b'\n'
dd.to_parquet(ddf,
'abfs://<container>/testfile.parquet',
storage_options=storage_options)
df3 = dd.read_parquet("abfs://<container>/testfile.parquet",
storage_options=storage_options)
Error message:
>>> df3 = dd.read_parquet("abfs://<container>/testfile.parquet", storage_options=storage_options)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\core.py", line 225, in read_parquet
meta, statistics, parts = engine.read_metadata(
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 202, in read_metadata
parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py", line 147, in _determine_pf_parts
base, fns = _analyze_paths(paths, fs)
File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\utils.py", line 405, in _analyze_paths
basepath = path_parts_list[0][:-1]
IndexError: list index out of range
Print a few things using %debug:
ipdb> path_parts_list
[]
ipdb> file_list
[]
ipdb> paths
[]
ipdb> fs
<adlfs.core.AzureBlobFileSystem object at 0x0000019872422C70>
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\core.py(225)read_parquet()
223 index = [index]
224
--> 225 meta, statistics, parts = engine.read_metadata(
226 fs,
227 paths,
ipdb> fs
<adlfs.core.AzureBlobFileSystem object at 0x0000019872422C70>
ipdb> paths
['tmp/testfile.parquet']
ipdb> gather_statistics
ipdb>
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py(147)_determine_pf_parts()
145 # This is a directory, check for _metadata, then _common_metadata
146 paths = fs.glob(paths[0] + fs.sep + "*")
--> 147 base, fns = _analyze_paths(paths, fs)
148 if "_metadata" in fns:
149 # Using _metadata file (best-case scenario)
ipdb> paths
[]
ipdb> u
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py(202)read_metadata()
200 # then each part will correspond to a file. Otherwise, each part will
201 # correspond to a row group (populated below).
--> 202 parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
203 fs, paths, gather_statistics, **kwargs
204 )
ipdb> paths
['tmp/testfile.parquet']
ipdb> paths[0]
'tmp/testfile.parquet'
It seems paths
moves from ['tmp/testfile.parquet']
to []
at some point. I think around https://github.com/dask/dask/blob/master/dask/dataframe/io/parquet/fastparquet.py#L146
I'll try pyarrow
Create new env:
> conda create -n adlfs-pa python=3.8
> conda activate adlfs-pa
> pip install adlfs
> conda install -c conda-forge dask pyarrow ipython
Check packages:
Setup code:
import pandas as pd
import dask.dataframe as dd
from distributed import Client
client = Client()
storage_options = <DEFINED>
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
df = pd.DataFrame(data=d)
ddf = dd.from_pandas(df, npartitions=2)
dd.to_csv(df=ddf,
filename='abfs://tmp/test_csvfile/*.csv',
storage_options=storage_options)
df2 = dd.read_csv('abfs://tmp/test_csvfile/*.csv',
storage_options=storage_options)
Same error as above
dd.to_parquet(ddf,
'abfs://tmp/testfile.parquet',
storage_options=storage_options)
df3 = dd.read_parquet("abfs://tmp/testfile.parquet",
storage_options=storage_options)
Same error as above
Some output of %debug:
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs-pa\lib\site-packages\dask\dataframe\io\parquet\utils.py(405)_analyze_paths()
403 path_parts_list = [_join_path(fn).split("/") for fn in file_list]
404 if root is False:
--> 405 basepath = path_parts_list[0][:-1]
406 for i, path_parts in enumerate(path_parts_list):
407 j = len(path_parts) - 1
ipdb> path_parts_list
[]
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs-pa\lib\site-packages\dask\dataframe\io\parquet\arrow.py(129)_determine_dataset_parts()
127 # This is a directory, check for _metadata, then _common_metadata
128 allpaths = fs.glob(paths[0] + fs.sep + "*")
--> 129 base, fns = _analyze_paths(allpaths, fs)
130 if "_metadata" in fns and "validate_schema" not in dataset_kwargs:
131 dataset_kwargs["validate_schema"] = False
ipdb> allpaths
[]
> c:\users\131416\appdata\local\continuum\anaconda3\envs\adlfs-pa\lib\site-packages\dask\dataframe\io\parquet\arrow.py(220)read_metadata()
218 # then each part will correspond to a file. Otherwise, each part will
219 # correspond to a row group (populated below)
--> 220 parts, dataset = _determine_dataset_parts(
221 fs, paths, gather_statistics, filters, kwargs.get("dataset", {})
222 )
ipdb> paths
['tmp/testfile.parquet']
ipdb> parts
*** NameError: name 'parts' is not defined
ipdb> dataset
*** NameError: name 'dataset' is not defined
ipdb> fs
<adlfs.core.AzureBlobFileSystem object at 0x0000020136448D60>
ipdb> gather_statistics
ipdb> filters
ipdb>
pyarrow over fastparquet doesn't seem to matter.
Just tested reading the csv file and worked on my linux machine. Although got the AzureHttpError
for the parquet file. I was also curious about path
> /home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/fsspec/spec.py(542)info()
540 if out:
541 return out[0]
--> 542 out = self.ls(path, detail=True, **kwargs)
543 path = path.rstrip("/")
544 out1 = [o for o in out if o["name"].rstrip("/") == path]
ipdb> path
'tmp/testfile.parquet/_metadata/_metadata'
> /home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/adlfs/core.py(576)__init__()
574 self.blob = blob
575
--> 576 super().__init__(
577 fs=fs,
578 path=path,
ipdb> fs
<adlfs.core.AzureBlobFileSystem object at 0x7efdfca6fe80>
ipdb> path
'tmp/testfile.parquet/_metadata/_metadata'
ipdb>
> /home/ray/local/bin/anaconda3/envs/adlfs/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py(202)read_metadata()
200 # then each part will correspond to a file. Otherwise, each part will
201 # correspond to a row group (populated below).
--> 202 parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
203 fs, paths, gather_statistics, **kwargs
204 )
ipdb> paths
['tmp/testfile.parquet']
reading the csv file worked fine on my Mac. Same AzureHttpError
on the parquet file.
I see there are two things here:
I've spent some time on this today. I can replicate your issue on my Windows machine, but it works as expected on Ubuntu and my Mac. I've found one compatibility issue with the 0.7.2 release of fsspec, which I will work on fixing tomorrow. Currently comparing package dependencies between Windows and Linux.
I just uploaded v0.2.2. Give it a shot and let me know if it works for you. Seems there was an issue with parsing container names in Windows, which should be fixed. Also found a change in fsspec v0.6.3 that is causing adlfs to fail one of its unit tests. Need to verify everything is OK before I allow fsspec >= 0.6.3, so pinned to fsspec0.6.0 to 0.6.2. Let me know if it solves your issue.
Thanks. I'll try tomorrow
Thanks @hayesgb! I was able to read in the csv file on my windows.
Going to move the parquet file read to a separate issue
Following up from a SO Q here: https://stackoverflow.com/questions/61220615/dask-read-parquet-from-azure-blob-azurehttperror/61229497#61229497
Unfortunately, i'm still getting an
AzureHttpError
. Not sure if anyone here has encountered this? Unfortunately, it's persistent for me.