radical-cybertools / radical.pilot

RADICAL-Pilot
http://radical-cybertools.github.io/radical-pilot/index.html
Other
54 stars 23 forks source link

RAPTOR: worker registration timed out #3241

Open AymenFJA opened 1 month ago

AymenFJA commented 1 month ago

This issue is happening on RIVANNA with the latest radical-stack:

  python               : /sfs/gpfs/tardis/home/vaf8uz/ve/rct_latest/bin/python3
  pythonpath           : 
  version              : 3.11.6
  virtualenv           : /sfs/gpfs/tardis/home/vaf8uz/ve/rct_latest
  radical.gtod         : 1.81.0
  radical.pilot        : 1.83.0
  radical.utils        : 1.83.0

We are requesting 1 node with 1 GPU. Session logs are attached to @arupcsedu comment below.

arupcsedu commented 1 month ago

rp.session.udc-aj34-36.djy8hg.020017.0004.zip

RuntimeError: registration with master timed out
RuntimeError: registration with master timed out
Traceback (most recent call last):
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 52, in <module>
Traceback (most recent call last):
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 52, in <module>
    run(sys.argv[1], sys.argv[2], sys.argv[3])
    run(sys.argv[1], sys.argv[2], sys.argv[3])
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 30, in run
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 30, in run
    worker = cls(raptor_id)
    worker = cls(raptor_id)
             ^^^^^^^^^^^^^^
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker_default.py", line 42, in __init__
             ^^^^^^^^^^^^^^
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker_default.py", line 42, in __init__
    super().__init__(manager=manager, rank=rank, raptor_id=raptor_id)
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker.py", line 134, in __init__
    super().__init__(manager=manager, rank=rank, raptor_id=raptor_id)
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker.py", line 134, in __init__
Traceback (most recent call last):
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 52, in <module>
    run(sys.argv[1], sys.argv[2], sys.argv[3])
    raise RuntimeError('registration with master timed out')
RuntimeError: registration with master timed out
    raise RuntimeError('registration with master timed out')
RuntimeError: registration with master timed out
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 30, in run
    worker = cls(raptor_id)
             ^^^^^^^^^^^^^^
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker_default.py", line 42, in __init__
    super().__init__(manager=manager, rank=rank, raptor_id=raptor_id)
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker.py", line 134, in __init__
    raise RuntimeError('registration with master timed out')
RuntimeError: registration with master timed out
Traceback (most recent call last):
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 52, in <module>
Traceback (most recent call last):
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 52, in <module>
    run(sys.argv[1], sys.argv[2], sys.argv[3])
    run(sys.argv[1], sys.argv[2], sys.argv[3])
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 30, in run
  File "/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/bin/radical-pilot-raptor-worker", line 30, in run
    worker = cls(raptor_id)
             ^^^^^^^^^^^^^^
    worker = cls(raptor_id)
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker_default.py", line 42, in __init__
             ^^^^^^^^^^^^^^
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker_default.py", line 42, in __init__
    super().__init__(manager=manager, rank=rank, raptor_id=raptor_id)
    super().__init__(manager=manager, rank=rank, raptor_id=raptor_id)
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker.py", line 134, in __init__
  File "/sfs/gpfs/tardis/project/bii_dsc_community/djy8hg/arupcsedu/cylonplus/cy-rp-env/lib/python3.11/site-packages/radical/pilot/raptor/worker.py", line 134, in __init__
    raise RuntimeError('registration with master timed out')
    raise RuntimeError('registration with master timed out')
RuntimeError: registration with master timed out
RuntimeError: registration with master timed out
andre-merzky commented 1 month ago

Thank's for opening the ticket. Could you also point me to the code you are running, please?

AymenFJA commented 1 month ago

@andre-merzky the code is below:

#!/usr/bin/env python3

import radical.pilot as rp

# Task state callback to print task status updates
def task_state_cb(task, state):
    print('  task %-30s: %s' % (task.uid, task.state))

# Define the NBEATS model training task
@rp.pythontask
def train_nbeats():
    import time
    from neuralforecast import NeuralForecast
    from neuralforecast.models import NBEATS
    from neuralforecast.utils import AirPassengersDF
    try:
        nf = NeuralForecast(
            models=[NBEATS(input_size=24, h=12, max_steps=100)],
            freq='M'
        )
        start_time = time.time()
        nf.fit(df=AirPassengersDF)
        nf.predict()
        end_time = time.time()

        task_time = end_time - start_time
        print(f"NBEATS model training completed in {task_time:.2f} seconds")
        return str(task_time)
    except Exception as e:
        print(f"Error during NBEATS model training: {str(e)}")
        return str(e)

# Define the DEEPAR model training task
@rp.pythontask
def train_deepar():
    import time
    from neuralforecast import NeuralForecast
    from neuralforecast.models import DEEPAR
    from neuralforecast.utils import AirPassengersDF
    try:
        nf = NeuralForecast(
            models=[DEEPAR(input_size=24, h=12, max_steps=100)],
            freq='M'
        )
        start_time = time.time()
        nf.fit(df=AirPassengersDF)
        nf.predict()
        end_time = time.time()

        task_time = end_time - start_time
        print(f"DEEPAR model training completed in {task_time:.2f} seconds")
        return str(task_time)
    except Exception as e:
        print(f"Error during DEEPAR model training: {str(e)}")
        return str(e)

# Define the THETA model training task
@rp.pythontask
def train_theta():
    import time
    from neuralforecast import NeuralForecast
    from neuralforecast.models import THETA
    from neuralforecast.utils import AirPassengersDF
    try:
        nf = NeuralForecast(
            models=[THETA(h=12)],
            freq='M'
        )
        start_time = time.time()
        nf.fit(df=AirPassengersDF)
        nf.predict()
        end_time = time.time()

        task_time = end_time - start_time
        print(f"THETA model training completed in {task_time:.2f} seconds")
        return str(task_time)
    except Exception as e:
        print(f"Error during THETA model training: {str(e)}")
        return str(e)

# Define the Transformer model training task
@rp.pythontask
def train_transformer():
    import time
    from neuralforecast import NeuralForecast
    from neuralforecast.models import Transformer
    from neuralforecast.utils import AirPassengersDF
    try:
        nf = NeuralForecast(
            models=[Transformer(input_size=24, h=12, max_steps=100)],
            freq='M'
        )
        start_time = time.time()
        nf.fit(df=AirPassengersDF)
        nf.predict()
        end_time = time.time()

        task_time = end_time - start_time
        print(f"Transformer model training completed in {task_time:.2f} seconds")
        return str(task_time)
    except Exception as e:
        print(f"Error during Transformer model training: {str(e)}")
        return str(e)

if __name__ == '__main__':

    session = rp.Session()
    try:
        # Pilot description
        pd = rp.PilotDescription()
        pd.access_schema = 'interactive' # <=======
        pd.resource = 'uva.rivanna' # <============ do not use local.locahost on HPC
        pd.runtime = 60  # pilot runtime (minutes)
        pd.cores = 30  # number of cores

        # Initialize PilotManager and TaskManager
        pmgr = rp.PilotManager(session=session)
        tmgr = rp.TaskManager(session=session)
        tmgr.register_callback(task_state_cb)

        # Submit the pilot
        pilot = pmgr.submit_pilots([pd])[0]
        tmgr.add_pilots(pilot)

        # Set up RAPTOR master and workers
        raptor = pilot.submit_raptors(rp.TaskDescription({'mode': rp.RAPTOR_MASTER, 'cpu_processes': 1}))[0] 
        workers = raptor.submit_workers(rp.TaskDescription({'mode': rp.RAPTOR_WORKER, 'cpu_processes': 27})) # add cpu_processes to worker

        # Define tasks for all four models
        tasks = []
        models = [train_nbeats, train_deepar, train_theta, train_transformer]
        for model_task in models:
            # Create multiple instances if needed
            for i in range(2):  # Adjust the range as needed for more tasks
                td = rp.TaskDescription({
                    'mode': rp.TASK_FUNCTION,
                    'function': model_task,  # Pass the function without calling it
                })
                tasks.append(td)

        # Submit tasks to RAPTOR
        submitted_tasks = raptor.submit_tasks(tasks)

        # Wait for tasks to complete
        tmgr.wait_tasks([task.uid for task in submitted_tasks])

        # Print task output (model training times)
        for task in submitted_tasks:
            print('%s [%s]: Task completed in %s seconds' % (task.uid, task.state, task.return_value))

        # Stop the RAPTOR master and wait for it to complete
        raptor.rpc('stop')
        tmgr.wait_tasks(raptor.uid)
        print('%s [%s]: %s' % (raptor.uid, raptor.state, raptor.stdout))

    finally:
        # Clean up the session
        session.close(download=False)
andre-merzky commented 1 month ago

Hmm, that code seems to work for me (or at least Raptor comes up), with that same stack. Aymen, any indication that MPI is acting up or something?

andre-merzky commented 1 month ago

BTW, this line:

'function': model_task, # Pass the function without calling it should be 'function': model_task(), # Pass the function without calling it Otherwise the decorator is not getting called. Apart from that change, the code seems to work with the same stack as yours, on my local machine.

AymenFJA commented 1 week ago

Hey @andre-merzky, any updates on this?

andre-merzky commented 1 day ago

So, good news and bad news. The bad first: I have no clue what's up with your run. Good news: it works for me out of the box (tm) :-) Well, that is good news because I am confident that we'll get this to work for your case also.

What exactly did I try:

The only change I made is to add "cores_per_node": 40 to the rivanna resource config so that we can specify number of nodes in the pilot description. I'll create a pull request for that change.

raptor_simple.txt