mbraceproject / MBrace.StarterKit

A collection of demos and tutorials for MBrace
http://mbrace.io
57 stars 34 forks source link

Is there a way to filter the worker? #16

Closed soloman817 closed 9 years ago

soloman817 commented 9 years ago

Hi guys,

I have another question regard the usage of m-brace.

First, this is my current worker list:

 Workers                                                                                                                                                                                                  

 Id                                Hostname        % CPU / Cores  % Memory / Total(MB)  Network(ul/dl : kbps)   Jobs   Process Id  Initialization Time       Heartbeat                 Is active 
 --                                --------        -------------  --------------------  ---------------------   ----   ----------  -------------------       ---------                 --------- 
 MBraceWorkerRole_IN_0             RD00155D4A335A    1.19 / 2       29.11 / 3583.00       1206.70 / 1209.70    0 / 16        2872  2015/4/27 4:53:30 +00:00  2015/4/28 7:52:43 +00:00  True      
 MBraceWorkerRole_IN_1             RD00155D4A25CE    1.58 / 2       30.73 / 3583.00       1085.06 / 1394.36    0 / 16        3000  2015/4/27 4:53:31 +00:00  2015/4/28 7:52:42 +00:00  True      
 c51eb57013714c47bdd95d5ffe0f7339  MACXIANG          3.11 / 8       28.82 / 16292.00       114.96 / 150.54     0 / 8        19672  2015/4/28 7:30:43 +00:00  2015/4/28 7:52:43 +00:00  True      

The first 2 workers are created by following the guide, and is created in azure server. These workers have no GPU device attached. But some cloud computation I want to use GPU device. So I attached a local worker (the 3rd worker), which has GPU attached.

So, I want to route my cloud task to the workers which has GPU attached. But I didn't find a solution in the starter kit. What I have done is to create an ugly function to cover all 3 workers and, use hostname to check if there is GPU attached:

let doJob(f : unit -> 'T) =
    let job = 
        [| 1 .. 3 |]
        |> CloudFlow.ofArray

        |> CloudFlow.map (fun i -> 
            let hostname = Dns.GetHostName()

            if hostname = "MACXIANG" then 
                f() |> Some
            else
                None)

        |> CloudFlow.toArray
        |> cluster.CreateProcess

    cluster.ShowProcesses()
    let results = job.AwaitResult()
    let result = results |> Array.choose id
    result.[0]

I want to ask, if there is a way to filter the workers? by the hardware condition of the worker's host.

Regards, Xiang.

eiriktsarpalis commented 9 years ago

Hi Xiang,

I would not recommend using the CloudFlow for this requirement since it is not guaranteed that computation will be scheduled in the intended machine(s).

You could achieve this using the WorkerRef abstraction. Running

cluster.GetWorkers()

will return an array of references to all workers in the cluster, which can be used to guide computation to specific machines. For example,

let workers = cluster.GetWorkers()
cloud {
    let macXiang = workers |> Array.find(fun w -> w.HostName = "MACXIANG")
    return! Cloud.StartAsCloudTask(cloud { ... }, target  = macXiang)
} |> cluster.Run

If you need to divert computation to multiple machines, you can always make use of the following primitive:

Cloud.Parallel : (Cloud<'T> * IWorkerRef) [] -> Cloud<'T []>

This tells mbrace to run the given computations in parallel, with each task assigned to a specific worker. You could use this to define the following general-purpose combinator:

let runFilteredParallel (filter : IWorkerRef -> bool) (computation : Cloud<'T>) = cloud {
    let! allWorkers = Cloud.GetAvailableWorkers()
    let jobs = allWorkers |> Array.choose (fun w -> if filter w then Some (computation, w) else None)
    return! Cloud.Parallel jobs
}

then run

runFilteredParallel (fun w -> w.Id.StartsWith "MAC") (cloud { .. })
soloman817 commented 9 years ago

@eiriktsarpalis thanks and that is very very helpful. I have some further questions.

  1. maybe we need extend the workerref type, to get more properties for filterring, such as environment variables, etc. so we can filter it like fun workerref -> workerref.Env.["HasGPU"] = "1"
  2. acutally, the runFilteredParallel is used to dispatch multiple cloud task. but actually, I have only one cloud task, and I want create a cluster-subset (with meet some constraints, such as worker.hasgpu=true), then I want to dispatch this one cloud task among this subset-cluster (with some loadbalancing).

I of course will use your first suggestion to cleanup my code, but would also like to hear some opioions on these 2 points.

soloman817 commented 9 years ago

@eiriktsarpalis

for point 2, I expect the following code:

let cluster = Runtime.GetHandle(config)
let gpuCluster = cluster |> Runtime.Subset (fun worker -> worker.env.["hasgpu"] = "1")
cloud { blablabla } |> gpuCluster.Run
eiriktsarpalis commented 9 years ago

For point (1) you could do the following:

cloud {
    let isGpuEnabled () = cloud {
        let! w = Cloud.CurrentWorker
        return (w, System.Environment.GetEnvironmentVariable "HasGPU" = "1")
    }
    // run computation once in every machine
    let! results = Cloud.ParallelEverywhere (isGpuEnabled())
    return results |> Map.ofArray
} |> cluster.Run

this will build a map by querying each worker which can then be used in the filtering lambda in the previous combinator.

As for point (2), I think that it is difficult to implement given the current implementation of Azure. It's also something that probably cannot be implemented to its most general extent with user combinators. But it's definitely worth investigating.

soloman817 commented 9 years ago

@eiriktsarpalis for point 1, your code is very clever :) I close this issue then.