mbraceproject / MBrace.Core

MBrace Core Libraries & Runtime Foundations
http://mbrace.io/
Apache License 2.0
211 stars 46 forks source link

Could not serialize data dependencies: MBrace.Thespian.ThespianCluster cluster #182

Open SiavashBabaei opened 6 years ago

SiavashBabaei commented 6 years ago

Executing the "Example: Running an iterative algorithm at scale with incremental notifications" at http://mbrace.io/starterkit/HandsOnTutorial.FSharp/examples/200-kmeans-clustering-example.html:

#load "ThespianCluster.fsx"

#load @"lib\utils.fsx"
#load @"lib\sieve.fsx"

open System
open MBrace.Core
open MBrace.Thespian
open Nessos.Streams

let cluster = 
    ThespianCluster.InitOnCurrentMachine( workerCount = Environment.ProcessorCount, 
                                          logger = ConsoleLogger(), 
                                          logLevel = LogLevel.Info )

let dim = 2 // point dimensions: we use 2 dimensions so we can chart the results
let numCentroids = 5 // The number of centroids to find
let partitions = 12 // The number of point partitions
let pointsPerPartition = 50000 // The number of points per partition
let epsilon = 0.1

/// Represents a multi-dimensional point.
type Point = float[]

/// Generates a set of points via a random walk from the origin, using provided seed.
let generatePoints dim numPoints seed : Point[] =
    let rand = Random(seed * 2003 + 22)
    let prev = Array.zeroCreate dim

    let nextPoint () =
        let arr = Array.zeroCreate dim
        for i = 0 to dim - 1 do 
            arr.[i] <- prev.[i] + rand.NextDouble() * 40.0 - 20.0
            prev.[i] <- arr.[i]
        arr

    [| for i in 1 .. numPoints -> nextPoint() |]

let randPoints = Array.init partitions (generatePoints dim pointsPerPartition)

let point2d (p:Point) = p.[0], p.[1]

[<AutoOpen>]
module KMeansHelpers =

    /// Calculates the distance between two points.
    let dist (p1 : Point) (p2 : Point) = 
        Array.fold2 (fun acc e1 e2 -> acc + pown (e1 - e2) 2) 0.0 p1 p2

    /// Assigns a point to the correct centroid, and returns the index of that centroid.
    let findCentroid (p: Point) (centroids: Point[]) : int =
        let mutable mini = 0
        let mutable min = Double.PositiveInfinity
        for i = 0 to centroids.Length - 1 do
            let dist = dist p centroids.[i]
            if dist < min then
                min <- dist
                mini <- i

        mini

    /// Given a set of points, calculates the number of points assigned to each centroid.
    let kmeansLocal (points : Point[]) (centroids : Point[]) : (int * (int * Point))[] =
        let lens = Array.zeroCreate centroids.Length
        let sums = 
            Array.init centroids.Length (fun _ -> Array.zeroCreate centroids.[0].Length)

        for point in points do
            let cent = findCentroid point centroids
            lens.[cent] <- lens.[cent] + 1
            for i = 0 to point.Length - 1 do
                sums.[cent].[i] <- sums.[cent].[i] + point.[i]

        Array.init centroids.Length (fun i -> (i, (lens.[i], sums.[i])))

    /// Sums a collection of points
    let sumPoints (pointArr : Point []) dim : Point =
        let sum = Array.zeroCreate dim
        for p in pointArr do
            for i = 0 to dim - 1 do
                sum.[i] <- sum.[i] + p.[i]
        sum

    /// Scalar division of a point
    let divPoint (point : Point) (x : float) : Point =
        Array.map (fun p -> p / x) point

let rec KMeansCloudIterate (partitionedPoints, epsilon, centroids, iteration, emit) = cloud {

     // Stage 1: map computations to each worker per point partition
    let! clusterParts =
        partitionedPoints
        |> Array.map (fun (p:CloudArray<_>, w) -> cloud { return kmeansLocal p.Value centroids }, w)
        |> Cloud.ParallelOnSpecificWorkers

    // Stage 2: reduce computations to obtain the new centroids
    let dim = centroids.[0].Length
    let newCentroids =
        clusterParts
        |> Array.concat
        |> ParStream.ofArray
        |> ParStream.groupBy fst
        |> ParStream.sortBy fst
        |> ParStream.map snd
        |> ParStream.map (fun clp -> clp |> Seq.map snd |> Seq.toArray |> Array.unzip)
        |> ParStream.map (fun (ns,points) -> Array.sum ns, sumPoints points dim)
        |> ParStream.map (fun (n, sum) -> divPoint sum (float n))
        |> ParStream.toArray

    // Stage 3: check convergence and decide whether to continue iteration
    let diff = Array.map2 dist newCentroids centroids |> Array.max

    do! Cloud.Logf "KMeans: iteration [#%d], diff %A with centroids /n%A" iteration diff centroids

    // emit an observation
    emit(DateTimeOffset.UtcNow,iteration,diff,centroids)

    if diff < epsilon then
        return newCentroids
    else
        return! KMeansCloudIterate (partitionedPoints, epsilon, newCentroids, iteration+1, emit)
}

let KMeansCloud(points, numCentroids, epsilon, emit) = cloud {  

    let initCentroids = points |> Seq.concat |> Seq.take numCentroids |> Seq.toArray

    let! workers = Cloud.GetAvailableWorkers()
    do! Cloud.Logf "KMeans: persisting partitioned point data to store."

    // Divide the points
    let! partitionedPoints = 
        points 
        |> Seq.mapi (fun i p -> 
            local { 
                // always schedule the same subset of points to the same worker
                // for caching performance gains
                let! ca = CloudValue.NewArray(p, StorageLevel.MemoryAndDisk) 
                return ca, workers.[i % workers.Length] }) 
        |> Local.Parallel

    do! Cloud.Logf "KMeans: persist completed, starting iteration."

    return! KMeansCloudIterate(partitionedPoints, epsilon, initCentroids, 1, emit) 
}

let kmeansTask = 
    KMeansCloud(randPoints, numCentroids, epsilon*10000.0, ignore) 
    |> cluster.Run

kmeansTask.Result

yields:

INFO : Uploading 'MBrace.Flow, Version=1.5.4.0, Culture=neutral, PublicKeyToken=null' [IMG 392.50 KiB, PDB 0.51 MiB]
INFO : Uploading 'Streams, Version=0.4.1.0, Culture=neutral, PublicKeyToken=null' [IMG 338.00 KiB, PDB 421.50 KiB]
INFO : Uploading 'FSI-ASSEMBLY_d731fef8-8866-4a4a-9eb4-8e10af24184f_1, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' [IMG 58.00 KiB]
INFO : Uploading data dependency 'Double[][][] randPoints@' [8.82 MiB]
WARNING : Could not serialize data dependencies: MBrace.Thespian.ThespianCluster cluster@
INFO : Posted CloudProcess<float [] []> '7a5bbe93-7e2b-4944-b989-371111226649'.
val kmeansTask : MBrace.Runtime.CloudProcess<Point []>
System.NullReferenceException: Object reference not set to an instance of an object.
   at System.Tuple`2.get_Item2()
   at FSI_0005.newCentroids@104-31.Invoke(Tuple`2 tupledArg)
   at FSI_0005.newCentroids@104-33.Invoke(Tuple`2 value)
   at Nessos.Streams.Stream.bulk@174[T](T[] source, FSharpFunc`2 iterf, StreamCancellationTokenSource cts, FSharpFunc`2 complete, Unit unitVar0)
   at Nessos.Streams.Stream.toArray[T](Stream`1 stream)
   at FSI_0005.KMeansCloudIterate@97-3.Invoke(Tuple`2[][] _arg1)
   at MBrace.Core.Builders.Bind@337-1.Invoke(ExecutionContext ctx, T t) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Core\Continuation\Builders.fs:line 337
--- End of stack trace from previous location where exception was thrown ---
   at <StartupCode$MBrace-Runtime>.$CloudProcess.AwaitResult@211-2.Invoke(CloudProcessResult _arg2) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Runtime\Runtime\CloudProcess.fs:line 211
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@506-1.Invoke(a a)
   at MBrace.Core.Internals.AsyncExtensions.Async.RunSync[T](FSharpAsync`1 workflow, FSharpOption`1 cancellationToken) in C:\Users\eirik.tsarpalis\devel\mbrace\MBrace.Core\src\MBrace.Core\Utils\AsyncExtensions.fs:line 99
   at <StartupCode$FSI_0007>.$FSI_0007.main@()
Stopped due to error
SiavashBabaei commented 6 years ago

The issue seems to be with ParStream and tuples (fst and snd functions). If I change the code to:

    let newCentroids =
        clusterParts
        |> Array.concat
        |> ParStream.ofArray
        |> ParStream.groupBy fst
        |> ParStream.toArray
        |> Array.sortBy fst
        |> ParStream.ofArray
        |> ParStream.map snd
        |> ParStream.map (fun clp -> clp |> Seq.map snd |> Seq.toArray |> Array.unzip)
        |> ParStream.map (fun (ns,points) -> Array.sum ns, sumPoints points dim)
        |> ParStream.map (fun (n, sum) -> divPoint sum (float n))
        |> ParStream.toArray

Then it works OK!

SiavashBabaei commented 6 years ago

Rechecking on much simpler code:

let temp2 =
    cloud {
        let res =
            [| (1, 3); (2, 2); (3, 1) |]
            |> ParStream.ofArray
            |> ParStream.sortBy fst
            |> ParStream.toArray

        return res
    } |> cluster.Run

gives val it : (int * seq<int * int>) [] = [|null; null; null|] rather than val it : (int * int) [] = [|(1, 3); (2, 2); (3, 1)|] --> Seems it cannot handle tuples at all. Moreover, the following snippet:

cloud {
    let res =
        [| 1; 2; 3 |]
        |> ParStream.ofArray
        |> ParStream.sortBy ( fun it -> it % 3 )
        |> ParStream.toArray

    return res
} |> cluster.Run

gives val it : int [] = [|0; 0; 0|] rather than the expected val it : int [] = [|3; 1; 2|]!!!