astronomy-commons / hipscat-import

HiPSCat import - generate HiPSCat-partitioned catalogs
https://hipscat-import.readthedocs.io
BSD 3-Clause "New" or "Revised" License
5 stars 3 forks source link

Errors inside futures are not mapped (makes it hard to debug) #282

Closed Schwarzam closed 3 months ago

Schwarzam commented 4 months ago

I ran into this generic error multiple times because I had a fits catalog corrupted.

"Some mapping stages failed. See logs for details."

The problem is that this error occurs inside a thread, but is only raised from the PipelineResumePlan.wait_for_futures function, and not from inside the thread where the error occurred, making it untraceable.

The actual function that had a runtime error was mr.map_to_pixels:

def _map_pixels(args, client):
    """Generate a raw histogram of object counts in each healpix pixel"""

    if args.resume_plan.is_mapping_done():
        return

    reader_future = client.scatter(args.file_reader)
    futures = []
    for key, file_path in args.resume_plan.map_files:
        futures.append(
            client.submit(
                mr.map_to_pixels, ## Error in this function
                input_file=file_path,
                resume_path=args.resume_plan.tmp_path,
                file_reader=reader_future,
                mapping_key=key,
                highest_order=args.mapping_healpix_order,
                ra_column=args.ra_column,
                dec_column=args.dec_column,
                use_hipscat_index=args.use_hipscat_index,
            )
        )
    args.resume_plan.wait_for_mapping(futures)

A possible solution for this is to add something like this below (actually how I found out the real error that the fits was corrupted) on the pipeline_resume_plan.py:

def worker_callbacks(f):
    """
    Called to create a trace of exceptions in threads.
    Formats the exception trace so that filenames and line numbers are clickable in VSCode.
    """
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append({
            "filename": tb.tb_frame.f_code.co_filename,
            "name": tb.tb_frame.f_code.co_name,
            "lineno": tb.tb_lineno
        })
        tb = tb.tb_next

    trace_str = "\n"
    for i in trace:
        trace_str += f"File \"{i['filename']}\", line {i['lineno']}, in {i['name']}\n"

    # Maybe it should be used a logger instead of print
    print(f"{type(e).__name__}: {e}\nTraceback (most recent call last):{trace_str}")

@dataclass
class PipelineResumePlan:
  """Container class for holding the state of pipeline plan."""
  ...
  def wait_for_futures(self, futures, stage_name):
          """Wait for collected futures to complete.
  ...
  # Add a callback to trace exceptions in threads for debugging
  future.add_done_callback(worker_callbacks)