fsspec / adlfs

fsspec-compatible Azure Datake and Azure Blob Storage access
BSD 3-Clause "New" or "Revised" License
178 stars 104 forks source link

Error: One of the request inputs is not valid. on mv or cp function #255

Open mattc-eostar opened 3 years ago

mattc-eostar commented 3 years ago

What happened:

Error occurred on move.

Checked the data lake and the files did copy to new location successfully but were not removed from previous location.

---------------------------------------------------------------------------
HttpResponseError                         Traceback (most recent call last)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
   1184                 return await self._client.page_blob.copy_incremental(**options)
-> 1185             return await self._client.blob.start_copy_from_url(**options)
   1186         except HttpResponseError as error:

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py in start_copy_from_url(self, copy_source, timeout, metadata, tier, rehydrate_priority, request_id_parameter, blob_tags_string, seal_blob, source_modified_access_conditions, modified_access_conditions, lease_access_conditions, **kwargs)
   2258             error = self._deserialize(_models.StorageError, response)
-> 2259             raise HttpResponseError(response=response, model=error)
   2260 

HttpResponseError: Operation returned an invalid status 'One of the request inputs is not valid.'

During handling of the above exception, another exception occurred:

HttpResponseError                         Traceback (most recent call last)
<command-2840456655888337> in <module>
     10 fs.mkdirs(f'{container_name}/RAW/Runs/{job_run_id}', exist_ok=True)
     11 # fs.mv(files, f'{container_name}/RAW/Runs/{job_run_id}/*', recursive=True)
---> 12 await fs.cp(f'{container_name}/RAW/Config/', f'{container_name}/RAW/Runs/{job_run_id}/', recursive=True)
     13 print(fs.ls(f'{container_name}/RAW/Config/'))
     14 print(fs.ls(f'{container_name}/RAW/Runs/{job_run_id}/'))

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/spec.py in cp(self, path1, path2, **kwargs)
   1163     def cp(self, path1, path2, **kwargs):
   1164         """Alias of :ref:`FilesystemSpec.copy`."""
-> 1165         return self.copy(path1, path2, **kwargs)
   1166 
   1167     def move(self, path1, path2, **kwargs):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 
     90     return wrapper

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     67         raise FSTimeoutError
     68     if isinstance(result[0], BaseException):
---> 69         raise result[0]
     70     return result[0]
     71 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in _runner(event, coro, result, timeout)
     23         coro = asyncio.wait_for(coro, timeout=timeout)
     24     try:
---> 25         result[0] = await coro
     26     except Exception as ex:
     27         result[0] = ex

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in _copy(self, path1, path2, recursive, on_error, maxdepth, **kwargs)
    295             if on_error == "ignore" and isinstance(ex, FileNotFoundError):
    296                 continue
--> 297             raise ex
    298 
    299     async def _pipe(self, path, value=None, **kwargs):

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/adlfs/spec.py in _cp_file(self, path1, path2, **kwargs)
   1548             cc2 = self.service_client.get_container_client(container2)
   1549             blobclient2 = cc2.get_blob_client(blob=path2)
-> 1550         await blobclient2.start_copy_from_url(blobclient1.url)
   1551         self.invalidate_cache(container1)
   1552         self.invalidate_cache(container2)

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py in wrapper_use_tracer(*args, **kwargs)
     72             span_impl_type = settings.tracing_implementation()
     73             if span_impl_type is None:
---> 74                 return await func(*args, **kwargs)
     75 
     76             # Merge span is parameter is set, but only if no explicit parent are passed

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
   1185             return await self._client.blob.start_copy_from_url(**options)
   1186         except HttpResponseError as error:
-> 1187             process_storage_error(error)
   1188 
   1189     @distributed_trace_async

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py in process_storage_error(storage_error)
    148     error.error_code = error_code
    149     error.additional_info = additional_data
--> 150     error.raise_with_traceback()
    151 
    152 

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/core/exceptions.py in raise_with_traceback(self)
    245     def raise_with_traceback(self):
    246         try:
--> 247             raise super(AzureError, self).with_traceback(self.exc_traceback)
    248         except AttributeError:
    249             self.__traceback__ = self.exc_traceback

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
   1183             if incremental_copy:
   1184                 return await self._client.page_blob.copy_incremental(**options)
-> 1185             return await self._client.blob.start_copy_from_url(**options)
   1186         except HttpResponseError as error:
   1187             process_storage_error(error)

/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py in start_copy_from_url(self, copy_source, timeout, metadata, tier, rehydrate_priority, request_id_parameter, blob_tags_string, seal_blob, source_modified_access_conditions, modified_access_conditions, lease_access_conditions, **kwargs)
   2257             map_error(status_code=response.status_code, response=response, error_map=error_map)
   2258             error = self._deserialize(_models.StorageError, response)
-> 2259             raise HttpResponseError(response=response, model=error)
   2260 
   2261         response_headers = {}

HttpResponseError: One of the request inputs is not valid.
RequestId:c8d8f1d3-601e-0006-1bab-79e110000000
Time:2021-07-15T19:01:25.5107678Z
ErrorCode:InvalidInput
Error:None

What you expected to happen:

The files within that directory should be moved to the new directory and then removed from the old directory.

Minimal Complete Verifiable Example:

fs = adlfs.AzureBlobFileSystem(
  account_name = storage_account_name,
  account_key = storage_account_access_key
)
container_name = "container1"

files = fs.ls(f'{container_name}/RAW/Config/')
fs.mkdirs(f'{container_name}/RAW/Runs/{job_run_id}', exist_ok=True)
fs.mv(f'{container_name}/RAW/Config/', f'{container_name}/RAW/Runs/{job_run_id}/', recursive=True)

Anything else we need to know?:

Environment: Databricks, adlfs==2021.7.0

aloysius-lim commented 3 years ago

I'm facing the same issue with adlfs 2021.10.0. It seems to be caused by Azure Blob Storage not supporting copying of directories (even if they are empty).

This is what seems to be happening:

fs.mv() calls fs.copy(), which in turn calls fs.expand_path() to get the list of files to copy. When recursive=True, this returns all the files under the path. The last line of fs.expand_path() sorts the list of paths before returning it:

        return list(sorted(out))

https://github.com/intake/filesystem_spec/blob/0cc4518fd9141c8894b022aad16acdc5049cc671/fsspec/spec.py#L907

So given this set of files and directories:

container/
    folder/
        sub1/
            file1.txt
            file2.txt
        sub2/
            file3.txt

and we call fs.mv('container/folder', 'container/folder2', recursive=True), then fs.expand_path() returns:

[
'container/folder',
'container/folder/sub1',
'container/folder/sub1/file1.txt',
'container/folder/sub1/file2.txt',
'container/folder/sub2',
'container/folder/sub2/file3.txt'
]

fs.copy() then iterates through this list to perform the copy operation:

        for p1, p2 in zip(paths, path2):
            try:
                self.cp_file(p1, p2, **kwargs)
            except FileNotFoundError:
                if on_error == "raise":
                    raise

https://github.com/intake/filesystem_spec/blob/0cc4518fd9141c8894b022aad16acdc5049cc671/fsspec/spec.py#L871-L876

The problem is that Azure Blob Storage does not support copying of directories. If we attempt the copy on each of the individual paths listed above, the copy succeeds on the files (e.g. container/folder/sub1/file1.txt) but fails on the directories.

akshayjain3450 commented 1 year ago

@mattc-eostar @aloysius-lim

Whenever you create a Storage Account on Microsoft Azure, you have three kinds of Azure Storage Account available. I encountered the same issue when I was writing to kind = "StorageV2". This kind does not support move and copy methods for moving data from temporary storage. I created another Storage Account with kind = "BlobStorage" and tried the same operation, this underlying StorageKind worked fine for me and supported move and copy methods.

jagadeesanmuthuvel commented 1 month ago

How to fix this issue? it is been there for long with adls v2

Danferno commented 1 month ago

It's not a great solution, but I wrote a custom function as a workaround that finds all the files first and then moves them. It uses the builtin concurrency to keep things somewhat performant.

        srcList = fs.find(src, withdirs=False)
        dstList = [file.replace(src, dst) for file in srcList]
        fs.mv(srcList, dstList)
        fs.rm(src, recursive=True)

The fs.rm(recursive=True) also seems to struggle. I wrote an fs_rm function

        fileList = fs.find(path, withdirs=False)
        if fileList:
            fs.rm(fileList)

        # Split by level (cannot delete non-empty directories)
        dirList = fs.find(path, withdirs=True)
        if fs.exists(path):
            dirList.append(path)
        dirsByLevel = defaultdict(list)
        for folder in dirList:
            dirsByLevel[len(folder.split('/'))].append(folder)

        # Delete from deepest to shallowest
        for level in sorted(dirsByLevel.keys(), reverse=True):
            fs.rm(dirsByLevel[level])