SETIatHCRO / gr-ata

6 stars 6 forks source link

Threading pattern for observation blocks #1

Open dkozel opened 4 years ago

dkozel commented 4 years ago

The responsibility for running the observation lives with the individual observation blocks. This includes handling the timing between steps. Here's an example (pseudo Python) of how to launch a thread when the flowgraph starts to do this.

import threading
import time

def __init__():
    # register message port

    # Sanity check inputs
    if len(source_list) != len(duration_list):
        throw Exception

def trackscan_obs():
    command = pmt.make_dict()
    command = pmt.dict_add(command, ant_key, ant_val)
    command = pmt.dict_add(command, freq_key, freq_val)
    self.message_port_pub(pmt.intern("command"), command)
    # Frequency only set once, antennas reused
    command = pmt.dict_delete(command, freq_key)

    for i, duration in enumerate(duration_list):
        # for az/el case
        command = pmt.dict_add(command, az_key, az_val[i])
        command = pmt.dict_add(command, el_key, az_val[i])
        self.message_port_pub(pmt.intern("command"), command)

        time.sleep(duration)

    # Option 1: explicitly release antennas 
    command = pmt.make_dict()
    command = pmt.dict_add(command, ant_key, ant_val)
    command = pmt.dict_add(command, reserve_key, "release")

    # Option 2: tell command block to end observation and let it clean up
    command = pmt.make_dict()
    command = pmt.dict_add(command, status_key, "done")

    self.message_port_pub(pmt.intern("command"), command)

def start():
    # Launch a separate thread that will run the observation
    t = threading.Thread(target=trackscan_obs, args = (1,)
    t.start()

    return super().start()

def stop():
    # Wait for the observation to end
    # Ideally we'd let the observation thread know we want to exit immediately
    t.join()
    return super().stop()
dkozel commented 4 years ago

For the note about gracefully ending an observation in the middle I found this which looks reasonable, but I don't know if we need to make the entire thread class or can just use some of the ideas in the trackscan_obs function. This is mostly a note to myself.

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()