ssl-hep / ServiceX_frontend

Client access library for ServiceX
BSD 3-Clause "New" or "Revised" License
5 stars 11 forks source link

Total number of files in progress bars are fixed to 3 #329

Open kyungeonchoi opened 11 months ago

kyungeonchoi commented 11 months ago

Tested version: servicex==3.0.0-alpha.6

Screenshot 2023-10-25 at 1 36 42 PM
BenGalewsky commented 11 months ago

It seems that some times we get an incorrect report of the total number of files from the ServiceX status message. We are re-writing this code, so I'm inclined to leave this til then.

Curious that it only downloaded three files. I would have expected the downloads to be 21/3 also... where there transform errors?

kyungeonchoi commented 11 months ago

The transformation ran just fine (no error) - total number of files in the progress bar was not showing the correct value. I just tested again on the same input dataset and now it is showing two instead of three.

The download continued over three files. However it crashed just after transformation is finished. I guess this is a separate problem. Let me put the error message here than another issue since this might be solved together.

Transform ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 81/2 00:30
Download  ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 24/2 01:05
ServiceX Transform 4d971481-dce4-424b-88b3-dbcbee46caaf
Transforms completed successfully
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
Cell In[7], line 1
----> 1 sx3 = ds.with_uproot_function(run_query).as_files()

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/make_it_sync/func_wrapper.py:63, in make_sync.<locals>.wrapped_call(*args, **kwargs)
     61 @wraps(fn)
     62 def wrapped_call(*args, **kwargs):
---> 63     return _sync_version_of_function(fn, *args, **kwargs)

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/make_it_sync/func_wrapper.py:26, in _sync_version_of_function(fn, *args, **kwargs)
     23 exector = ThreadPoolExecutor(max_workers=1)
     24 future = exector.submit(get_data_wrapper, *args, **kwargs)
---> 26 return future.result()

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/concurrent/futures/_base.py:456, in Future.result(self, timeout)
    454     raise CancelledError()
    455 elif self._state == FINISHED:
--> 456     return self.__get_result()
    457 else:
    458     raise TimeoutError()

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/make_it_sync/func_wrapper.py:21, in _sync_version_of_function.<locals>.get_data_wrapper(*args, **kwargs)
     19 asyncio.set_event_loop(loop)
     20 assert not loop.is_running()
---> 21 return loop.run_until_complete(fn(*args, **kwargs))

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/asyncio/base_events.py:653, in BaseEventLoop.run_until_complete(self, future)
    650 if not future.done():
    651     raise RuntimeError('Event loop stopped before Future completed.')
--> 653 return future.result()

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/servicex/dataset.py:440, in Dataset.as_files_async(self, display_progress, provided_progress)
    435 r"""
    436 Submit the transform and request all the resulting files to be downloaded
    437 :return: TransformResult instance with the list of complete paths to the downloaded files
    438 """
    439 with ExpandableProgress(display_progress, provided_progress) as progress:
--> 440     return await self.submit_and_download(signed_urls_only=False,
    441                                           expandable_progress=progress)

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/servicex/dataset.py:263, in Dataset.submit_and_download(self, signed_urls_only, expandable_progress)
    261 signed_urls = []
    262 downloaded_files = []
--> 263 download_result = await download_files_task
    264 if signed_urls_only:
    265     signed_urls = download_result

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/servicex/dataset.py:428, in Dataset.download_files(self, signed_urls_only, progress, download_progress, cached_record)
    425         break
    427 # Now just wait until all of our tasks complete
--> 428 await asyncio.gather(*download_tasks)
    429 return result_uris

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/servicex/dataset.py:370, in Dataset.download_files.<locals>.download_file(minio, filename, progress, download_progress, shorten_filename)
    363 async def download_file(
    364     minio: MinioAdapter,
    365     filename: str,
   (...)
    368     shorten_filename: bool = False,
    369 ):
--> 370     downloaded_filename = await minio.download_file(
    371         filename, self.download_path, shorten_filename=shorten_filename
    372     )
    373     result_uris.append(downloaded_filename.as_posix())
    374     progress.advance(download_progress)

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/servicex/minio_adapter.py:96, in MinioAdapter.download_file(self, object_name, local_dir, shorten_filename)
     86 os.makedirs(local_dir, exist_ok=True)
     87 path = Path(
     88     os.path.join(
     89         local_dir,
   (...)
     93     )
     94 )
---> 96 _ = await self.minio.fget_object(
     97     bucket_name=self.bucket, object_name=object_name, file_path=path.as_posix()
     98 )
     99 return path.resolve()

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/miniopy_async/api.py:1489, in Minio.fget_object(self, bucket_name, object_name, file_path, request_headers, ssec, version_id, extra_query_params, tmp_file_path)
   1478 response = await self.get_object(
   1479     bucket_name,
   1480     object_name,
   (...)
   1486     extra_query_params=extra_query_params,
   1487 )
   1488 async with aiofile.async_open(tmp_file_path, "wb") as tmp_file:
-> 1489     async for data in response.content.iter_chunked(n=1024 * 1024):
   1490         await tmp_file.write(data)
   1491 if os.path.exists(file_path):

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/aiohttp/streams.py:35, in AsyncStreamIterator.__anext__(self)
     33 async def __anext__(self) -> _T:
     34     try:
---> 35         rv = await self.read_func()
     36     except EofStream:
     37         raise StopAsyncIteration

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/aiohttp/streams.py:385, in StreamReader.read(self, n)
    381 # TODO: should be `if` instead of `while`
    382 # because waiter maybe triggered on chunk end,
    383 # without feeding any data
    384 while not self._buffer and not self._eof:
--> 385     await self._wait("read")
    387 return self._read_nowait(n)

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/aiohttp/streams.py:303, in StreamReader._wait(self, func_name)
    301 try:
    302     if self._timer:
--> 303         with self._timer:
    304             await waiter
    305     else:

File ~/opt/anaconda3/envs/servicex3.0/lib/python3.11/site-packages/aiohttp/helpers.py:725, in TimerContext.__exit__(self, exc_type, exc_val, exc_tb)
    722     self._tasks.pop()
    724 if exc_type is asyncio.CancelledError and self._cancelled:
--> 725     raise asyncio.TimeoutError from None
    726 return None

TimeoutError: