Open yanghua opened 2 weeks ago
UT:
def test_find(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: with pytest.raises(ValueError, match="Cannot access all of TOS via path ."): tosfs.find("") with pytest.raises(ValueError, match="Cannot access all of TOS via path *."): tosfs.find("*") with pytest.raises(ValueError, match="Cannot access all of TOS via path tos://."): tosfs.find("tos://") with pytest.raises( ValueError, match="Cannot access all of TOS without specify a bucket." ): tosfs.find("/") assert len(tosfs.find(bucket, maxdepth=1)) > 0 with pytest.raises( ValueError, match="Can not specify 'prefix' option " "alongside 'maxdepth' options.", ): tosfs.find(bucket, maxdepth=1, withdirs=True, prefix=temporary_workspace) result = tosfs.find(bucket, prefix=temporary_workspace) assert len(result) == 0 result = tosfs.find(bucket, prefix=random_str()) assert len(result) == 0 result = tosfs.find( bucket, prefix=temporary_workspace + "/", withdirs=True, detail=True ) assert len(result) == len([bucket, f"{bucket}/{temporary_workspace}/"]) assert ( result[f"{bucket}/{temporary_workspace}"]["name"] == f"{bucket}/{temporary_workspace}" ) assert result[f"{bucket}/{temporary_workspace}"]["type"] == "directory" # issue start result = tosfs.find( f"{bucket}/{temporary_workspace}/", withdirs=True, maxdepth=1, detail=True ) assert len(result) == 0 # 这个断言本地OK,但github ci一直报错 # issue end dir_name = random_str() sub_dir_name = random_str() file_name = random_str() sub_file_name = random_str() tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") result = tosfs.find( f"{bucket}/{temporary_workspace}", prefix=dir_name, withdirs=False ) assert len(result) == 0 tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}") assert tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}") result = tosfs.find( f"{bucket}/{temporary_workspace}/{dir_name}", prefix=file_name, withdirs=False ) assert len(result) == 1 tosfs.rm_file( f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}" ) tosfs.rmdir(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}") tosfs.rm_file(f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}") tosfs.rmdir(f"{bucket}/{temporary_workspace}/{dir_name}") tosfs.rmdir(f"{bucket}/{temporary_workspace}")
come from this PR #56
UT: