pyiron / pyiron_base

Core components of the pyiron integrated development environment (IDE) for computational materials science
https://pyiron-base.readthedocs.io
BSD 3-Clause "New" or "Revised" License
19 stars 13 forks source link

Status and fetching of remote jobs #791

Open Leimeroth opened 1 year ago

Leimeroth commented 1 year ago

Jobs executed on a remote cluster can not run collect, because they are not found in the filetable. So jobs get the status "initialized", independent if they ran successfully or not. Also update_from_remote() and wait_for_jobs are not fetching the jobs because of this issue. Instead they have to be manually downloaded. The manual download also comes with several issues because the jobs status does not work. It is hard to filter the jobs that already finished, when everything has the status "intialized". If a job is fetched that is not finished yet it will delete the working directory of the job on the cluster, so the job will fail completely.

IMO a more robust way to fix the issue of fetching is to simply download everything that can't be found in the remote squeue job table anymore instead of looking at the jobs status, which f.e. also breaks when the job runs into a time out. Would it be possible to create a db table "is_remote" that simply stores all ids of jobs that have been transferred? When they are fetched back they could be deleted from the table so no complex status checking on the cluster would be required, but instead a simple check if the job is still listed with squeue would suffice. This could probably even be implemented as part of pysqa so that it does not mess with anything pyiron specific.

Error backtrace:

2022-07-12 21:19:06,128 - pyiron_log - INFO - run job: Cook1_1000 id: 749443, status: collect
Traceback (most recent call last):
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 735, in run
    self._run_if_collect()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1303, in _run_if_collect
    run_job_with_status_collect(job=self)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/runfunction.py", line 194, in run_job_with_status_collect
    job.project.db.item_update(job._runtime(), job.job_id)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1375, in _runtime
    start_time = self.project.db.get_item_by_id(self.job_id)["timestart"]
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 118, in get_item_by_id
    return {
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 119, in <dictcomp>
    k: list(v.values())[0]
IndexError: list index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__main__.py", line 3, in <module>
    main()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__init__.py", line 63, in main
    args.cli(args)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/wrapper.py", line 31, in main
    job_wrapper_function(
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/wrapper.py", line 171, in job_wrapper_function
    job.run()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/wrapper.py", line 124, in run
    self.job.run_static()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 769, in run_static
    execute_job_with_external_executable(job=self)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/runfunction.py", line 508, in execute_job_with_external_executable
    job.run()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/generic/util.py", line 213, in decorated
    return function(*args, **kwargs)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 749, in run
    self.drop_status_to_aborted()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 1238, in drop_status_to_aborted
    self.refresh_job_status()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/job/generic.py", line 455, in refresh_job_status
    initial_status=self.project.db.get_job_status(self.job_id),
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 317, in get_job_status
    return self._job_table[self._job_table.id == job_id].status.values[0]
IndexError: index 0 is out of bounds for axis 0 with size 0
Leimeroth commented 1 year ago

Extending on the id of the pysqa based fetching: A database with 3 columns: directory, remote_id, local_id would allow to implement everything in a more or less pyiron independent way in pysqa directory could store the working directory that is extended with the local and remote paths stored in config, local id in this case the pyiron id and remote id the queueing system id. Than all the filtering that is currently done and causing so many errors is not necessary anymore. Instead project can just dispatch fetching completely to pysqa which could a return a list with all fetched local ids that can be used to update the status in the local database.

ligerzero-ai commented 1 year ago

I can imagine that this stuff is a significant obstacle to the usability of pyiron in this configuration that users will notice very quickly. I'll work on this soon, since I'll also be using this in the somewhat near future.

I suppose the task list to completely unconvolute the job mess that the remote cluster configuration experiences would be something like this:

Enable a complete separate set of job status code which:

This squeue query will contain information on: 1) The number of jobs running on the system 2) The name/ID of jobs running/queued on the system 3) The time this file was created

Things that would be nice to have:

Let me know if I missed anything or if you see any huge barriers to implementation before I start in a few weeks.

jan-janssen commented 1 year ago

@Leimeroth I made some progress in this direction with https://github.com/pyiron/pyiron_base/pull/878 and https://github.com/pyiron/pysqa/pull/143 Can you check if this solves this issue?

Leimeroth commented 1 year ago

The errors are still the same on the cluster IndexError: list index out of range because get_item_by_id returns a probably empty dict and IndexError: index 0 is out of bounds for axis 0 with size 0 because the job is not found in the get_job_status function.

ligerzero-ai commented 1 year ago

Per our meeting on Monday, @jan-janssen 's explanation of why the implementation that @Leimeroth has suggested is incompatible with pyiron is because there are certain types of jobs that can spawn additional jobs in pyiron.

However, I am not convinced that the assignment of pi_None as the jobname is necessary, which forces the workaround to go through the slurm SQUEUE command to do unique job matching through directory matching. Do note that this will not work for all queueing systems, anyway.

Why can't the jobnames have appended something in the case of jobs which spawn further jobs? For example, pi_JOBID_SUBJOBID in the case of remote jobs without a database? Is this not a reasonable solution to the problem of jobs which can spawn further jobs?

Then the only check that needs to occur is to see if there are multiple underscores in the job_name to reconcile it with the database.

ligerzero-ai commented 1 year ago

https://github.com/pyiron/pyiron_base/pull/1067/files Could you try with the changes made in this PR?

Leimeroth commented 1 year ago

Works at least for simple jobs :)

Leimeroth commented 1 year ago

Advanced jobs still fail. When I try to run a Murnaghan job for example:

ref= pr.create.job.Vasp("Ref")
ref.structure = pr.create.structure.bulk("Cu", cubic=True)
ref.server.cores = 12

murn = pr.create.job.Murnaghan("MurnTest", delete_aborted_job=True)
murn.ref_job = ref
murn.server.cores = 96
murn.server.queue = "short_l2"
murn.input["num_points"] = 8

murn.run()

the following error occurs:


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 68.28it/s]
/nfshome/leimeroth/git_projects/pysqa/pysqa/utils/remote.py:240: UserWarning: mkdir: missing operand
Try 'mkdir --help' for more information.

  warnings.warn(stderr.read().decode())
0it [00:00, ?it/s]
/nfshome/leimeroth/git_projects/pysqa/pysqa/utils/remote.py:240: UserWarning: Traceback (most recent call last):
  File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/home/nl77pupy/miniconda3/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/__main__.py", line 3, in <module>
    main()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/control.py", line 59, in main
    args.cli(args)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/cli/wrapper.py", line 31, in main
    job_wrapper_function(
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/wrapper.py", line 161, in job_wrapper_function
    job = JobWrapper(
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/wrapper.py", line 69, in __init__
    self.job = pr.load_from_jobpath(
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/project/generic.py", line 880, in load_from_jobpath
    job = job.to_object()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/job/core.py", line 500, in to_object
    return self.project_hdf5.to_object(object_type, **qwargs)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/storage/hdfio.py", line 1393, in to_object
    obj = self.create_instance(class_object, **qwargs)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/storage/hdfio.py", line 1352, in create_instance
    return cls(**init_args)
  File "/home/nl77pupy/git_projects/pyiron_atomistics/pyiron_atomistics/atomistics/master/murnaghan.py", line 657, in __init__
    super(Murnaghan, self).__init__(project, job_name)
  File "/home/nl77pupy/git_projects/pyiron_atomistics/pyiron_atomistics/atomistics/master/parallel.py", line 26, in __init__
    super(AtomisticParallelMaster, self).__init__(project, job_name=job_name)
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py", line 74, in __init__
    self.refresh_submission_status()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py", line 218, in refresh_submission_status
    self.submission_status.refresh()
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/jobs/master/submissionstatus.py", line 199, in refresh
    computer = self.database.get_item_by_id(self.job_id)["computer"]
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 190, in get_item_by_id
    return {
  File "/home/nl77pupy/git_projects/pyiron_base/pyiron_base/database/filetable.py", line 191, in <dictcomp>
    k: list(v.values())[0]
IndexError: list index out of range

  warnings.warn(stderr.read().decode())

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
Cell In[6], line 11
      8 murn.server.queue = "short_l2"
      9 murn.input["num_points"] = 8
---> 11 murn.run()

File ~/git_projects/pyiron_base/pyiron_base/utils/deprecate.py:171, in Deprecator.__deprecate_argument.<locals>.decorated(*args, **kwargs)
    161     if kw in self.arguments:
    162         warnings.warn(
    163             message_format.format(
    164                 "{}.{}({}={})".format(
   (...)
    169             stacklevel=2,
    170         )
--> 171 return function(*args, **kwargs)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:693, in GenericJob.run(self, delete_existing_job, repair, debug, run_mode, run_again)
    691     self._run_if_repair()
    692 elif status == "initialized":
--> 693     self._run_if_new(debug=debug)
    694 elif status == "created":
    695     self._run_if_created()

File ~/git_projects/pyiron_base/pyiron_base/jobs/master/parallel.py:362, in ParallelMaster._run_if_new(self, debug)
    354 """
    355 Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
    356 the hdf5 file and the corresponding directory structure.
   (...)
    359     debug (bool): Debug Mode
    360 """
    361 self.submission_status.submitted_jobs = 0
--> 362 super()._run_if_new(debug=debug)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:1217, in GenericJob._run_if_new(self, debug)
   1209 def _run_if_new(self, debug=False):
   1210     """
   1211     Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
   1212     the hdf5 file and the corresponding directory structure.
   (...)
   1215         debug (bool): Debug Mode
   1216     """
-> 1217     run_job_with_status_initialized(job=self, debug=debug)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:76, in run_job_with_status_initialized(job, debug)
     74 else:
     75     job.save()
---> 76     job.run()

File ~/git_projects/pyiron_base/pyiron_base/utils/deprecate.py:171, in Deprecator.__deprecate_argument.<locals>.decorated(*args, **kwargs)
    161     if kw in self.arguments:
    162         warnings.warn(
    163             message_format.format(
    164                 "{}.{}({}={})".format(
   (...)
    169             stacklevel=2,
    170         )
--> 171 return function(*args, **kwargs)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:695, in GenericJob.run(self, delete_existing_job, repair, debug, run_mode, run_again)
    693     self._run_if_new(debug=debug)
    694 elif status == "created":
--> 695     self._run_if_created()
    696 elif status == "submitted":
    697     run_job_with_status_submitted(job=self)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:1228, in GenericJob._run_if_created(self)
   1219 def _run_if_created(self):
   1220     """
   1221     Internal helper function the run if created function is called when the job status is 'created'. It executes
   1222     the simulation, either in modal mode, meaning waiting for the simulation to finish, manually, or submits the
   (...)
   1226         int: Queue ID - if the job was send to the queue
   1227     """
-> 1228     return run_job_with_status_created(job=self)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:109, in run_job_with_status_created(job)
    107     job.run_if_non_modal()
    108 elif job.server.run_mode.queue:
--> 109     job.run_if_scheduler()
    110 elif job.server.run_mode.interactive:
    111     job.run_if_interactive()

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/generic.py:864, in GenericJob.run_if_scheduler(self)
    856 def run_if_scheduler(self):
    857     """
    858     The run if queue function is called by run if the user decides to submit the job to and queing system. The job
    859     is submitted to the queuing system using subprocess.Popen()
   (...)
    862         int: Returns the queue ID for the job.
    863     """
--> 864     return run_job_with_runmode_queue(job=self)

File ~/git_projects/pyiron_base/pyiron_base/jobs/job/runfunction.py:386, in run_job_with_runmode_queue(job)
    379 else:
    380     command = (
    381         "python -m pyiron_base.cli wrapper -p "
    382         + job.working_directory
    383         + " -j "
    384         + str(job.job_id)
    385     )
--> 386 que_id = state.queue_adapter.submit_job(
    387     queue=job.server.queue,
    388     job_name="pi_" + str(job.job_id),
    389     working_directory=job.project_hdf5.working_directory,
    390     cores=job.server.cores,
    391     run_time_max=job.server.run_time,
    392     memory_max=job.server.memory_limit,
    393     command=command,
    394 )
    395 if que_id is not None:
    396     job.server.queue_id = que_id

File ~/git_projects/pysqa/pysqa/queueadapter.py:158, in QueueAdapter.submit_job(self, queue, job_name, working_directory, cores, memory_max, run_time_max, dependency_list, command)
    131 def submit_job(
    132     self,
    133     queue=None,
   (...)
    140     command=None,
    141 ):
    142     """
    143     Submits command to the given queue.
    144 
   (...)
    156         int:
    157     """
--> 158     return self._adapter.submit_job(
    159         queue=queue,
    160         job_name=job_name,
    161         working_directory=working_directory,
    162         cores=cores,
    163         memory_max=memory_max,
    164         run_time_max=run_time_max,
    165         command=command,
    166     )

File ~/git_projects/pysqa/pysqa/utils/remote.py:60, in RemoteQueueAdapter.submit_job(self, queue, job_name, working_directory, cores, memory_max, run_time_max, dependency_list, command)
     58 self._transfer_data_to_remote(working_directory=working_directory)
     59 output = self._execute_remote_command(command=command)
---> 60 return int(output.split()[-1])

IndexError: list index out of range
``
ligerzero-ai commented 1 year ago

Hi Niklas, I'll try to look into this this week.

jan-janssen commented 1 year ago

To my knowledge the Murnaghan example already fails when running without database, unless the individual jobs are executed in interactive mode.