NAMD / pypelinin

Python library to distribute jobs and pipelines among a cluster
3 stars 5 forks source link

Create generic pipeline_manager.start() #54

Open andrebco opened 11 years ago

andrebco commented 11 years ago

Pipeliner Manager should have a method that starts all the pipelines from an iterable (or generator). This method should be done on pipeline manager in order to:

israelst commented 11 years ago

I'm working on a feature at MBJ, which can be the base to solve this issue. Code comes soon.

andrebco commented 11 years ago

This issue is also related to the reduction of memory usage on sendpipelines. This generic pipelinemanager.start() may consider having a memory usage strategy to avoid the high usage of memory while running many pipelines.

israelst commented 11 years ago

This is the runner, this is the sendpipeline

[UPDATE: This repo is private, sorry]

turicas commented 11 years ago

@israelst, this repo is private. :)

israelst commented 11 years ago

These files are under agressive refactoring, but...

Here comes the runner:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import os.path
import time
import sys
from importlib import import_module

def main():
    """Run the sendpipelines."""
    parser, args, rest = _parse_args()
    script = args.script
    if os.path.exists(script):
        sys.argv = [script] + rest
        sys.path.insert(0, os.path.dirname(script))
        module = import_module(os.path.basename(script)[:-3])
        send_pipelines(*module.main())
    else:
        parser.error('could not find script at "{}"'.format(script))

def _parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('script')
    args, rest = parser.parse_known_args()
    return parser, args, rest

def send_pipelines(pipeline_generator, manager, total):
    print 'Sending pipelines...'
    STATS_INTERVAL = 10
    POOL = 1000 # Workers per core X Brokers X Cores (What it means?)
    start_time = time.time()
    started = lambda: manager.started_pipelines
    finished = lambda: manager.finished_pipelines
    rate = lambda x: x / (time.time() - start_time)
    pipelines = pipeline_generator()
    def print_stats(interval):
        if started() % interval == 0 or\
           finished() % interval == 0:
            print '\rSent: {}/{} ({:10.5f}/s) Finished: {}/{}({:10.5f}/s)'\
                  .format(started(), total,
                          rate(started()), finished(),
                          total, rate(finished())),
            sys.stdout.flush()

    while manager.finished_pipelines < total:
        active_pipelines = started() - finished()
        while active_pipelines < POOL and started() < total:
            manager.start(pipelines.next())
            print_stats(STATS_INTERVAL)
        manager.update(timeout=1)
        print_stats(STATS_INTERVAL)

    end_time = time.time()
    total_time = end_time - start_time
    pipelines_per_second = total / (end_time - start_time)
    print 'Total time: {:10.5f} seconds'.format(total_time)
    print 'Pipelines per second: {:10.5f}'.format(pipelines_per_second)

main()

Here comes the sendpipelines:

#!/usr/bin/env python
# coding: utf-8

import argparse

from pypelinin import Job, Pipeline, PipelineManager
import pymongo

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--host', default='127.0.0.1', help='Router host')
parser.add_argument('--mongo', default='127.0.0.1', help='MongoDB host')
parser.add_argument('--last', default=9999999, type=int, help='Upper bound')

def main():
    args = parser.parse_args()

    coll = pymongo.Connection(host=args.mongo)['my_db']['my_coll']

    _range = xrange(0 , args.last)

    def pipelines():
        pipeline_definition = {Job('Job1'): Job('Job2'),
                               Job('Job2'): Job('Job3')}
        for _id in _range:
            coll.insert({'_id': _id})
            yield Pipeline(pipeline_definition, data={'_id': _id})

    total = len(_range)
    manager = PipelineManager(api='tcp://{}:5550'.format(args.host),
                                       broadcast='tcp://{}:5551'.format(args.host))
    return pipelines, manager, total