DAMPEEU / DmpWorkflow

Workflow framework for DAMPE remote computing & accounting
1 stars 4 forks source link

improvements on run_cached #41

Open zimmerst opened 8 years ago

zimmerst commented 8 years ago

new run_cached method uses Popen in connection with two file objects; interleaving would be nice, but doesn't work. The following method does not contain the correct logging information:

def run_cached(cmd_args, interleaved=True, chunksize=36, cachedir="/tmp"):
    # inspired from http://tinyurl.com/hslhjfe (StackOverflow)                                                                                                                                                                                
    """ returns file objects to output & error caching the output of a running process """
    if not isinstance(cmd_args, list):
        raise RuntimeError('must be list to be called')
    logger.info("attempting to run: %s", str(cmd_args))
    args = [[], []]  # first is output, second is errors                                                                                                                                                                                      
    tsk = Popen(cmd_args, stdout=PIPE, stderr=PIPE)
    poll = spoll()
    poll.register(tsk.stdout, POLLIN | POLLHUP)
    poll.register(tsk.stderr, POLLIN | POLLHUP)
    pollc = 2
    events = poll.poll()
    tmp_out = NamedTemporaryFile(dir=cachedir, delete=True)
    tmp_err = NamedTemporaryFile(dir=cachedir, delete=True)
    chunk = []
    chunk_err = []
    rc = tsk.wait()
    while pollc > 0 and len(events) > 0:
        if len(chunk) > chunksize:
            tmp_out.write("\n".join(chunk))
            tmp_err.write("\n".join(chunk_err))
            tmp_err.flush()
            tmp_out.flush()
            chunk = []
            chunk_err = []
        for rfd, event in events:
            if event & POLLIN:
                if rfd == tsk.stdout.fileno():
                    line = tsk.stdout.readline()
                    if len(line) > 0:
                        val = str(line[:-1])
                        chunk.append(val)
                if rfd == tsk.stderr.fileno():
                    line = tsk.stderr.readline()
                    if len(line) > 0:
                        chunk_err.append(line[:-1])
                        val = chunk_err[-1] if len(chunk_err) else args[1][-1]
                        if interleaved:
                            chunk.append(val)
            if event & POLLHUP:
                poll.unregister(rfd)
                pollc -= 1
            if pollc > 0:
                events = poll.poll()
    # must rewind tmp_out & tmp_err                                                                                                                                                                                                           
    tmp_out.seek(0)
    tmp_err.seek(0)
    return tmp_out, tmp_err, rc

this is an enhancement and not needed for functionality.

zimmerst commented 8 years ago

to add to this thread, we should investigate how the log file can be recovered if the parent process receives a SIGTERM, some pointers here: http://stackoverflow.com/questions/18499497/how-to-process-sigterm-signal-gracefully