chrislusf / gleam

Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.
Apache License 2.0
3.46k stars 290 forks source link
distributed-computing distributed-systems golang map-reduce

Gleam

Build Status GoDoc Wiki Go Report Card codecov

Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize.

Gleam is built in Go, and the user defined computation can be written in Go, Unix pipe tools, or any streaming programs.

High Performance

Memory Efficient

Flexible

Easy to Customize

One Flow, Multiple ways to execute

Gleam code defines the flow, specifying each dataset(vertex) and computation step(edge), and build up a directed acyclic graph(DAG). There are multiple ways to execute the DAG.

The default way is to run locally. This works in most cases.

Here we mostly talk about the distributed mode.

Distributed Mode

The distributed mode has several names to explain: Master, Agent, Executor, Driver.

Gleam Driver

Gleam Master

Gleam Agent

Gleam Executor

Dataset

By leaving it in memory, the flow can have back pressure, and can support stream computation naturally.

Documentation

Word Count

Word Count

Basically, you need to register the Go functions first. It will return a mapper or reducer function id, which we can pass it to the flow.

package main

import (
    "flag"
    "strings"

    "github.com/chrislusf/gleam/distributed"
    "github.com/chrislusf/gleam/flow"
    "github.com/chrislusf/gleam/gio"
    "github.com/chrislusf/gleam/plugins/file"
)

var (
    isDistributed   = flag.Bool("distributed", false, "run in distributed or not")
    Tokenize  = gio.RegisterMapper(tokenize)
    AppendOne = gio.RegisterMapper(appendOne)
    Sum = gio.RegisterReducer(sum)
)

func main() {

    gio.Init()   // If the command line invokes the mapper or reducer, execute it and exit.
    flag.Parse() // optional, since gio.Init() will call this also.

    f := flow.New("top5 words in passwd").
        Read(file.Txt("/etc/passwd", 2)).  // read a txt file and partitioned to 2 shards
        Map("tokenize", Tokenize).    // invoke the registered "tokenize" mapper function.
        Map("appendOne", AppendOne).  // invoke the registered "appendOne" mapper function.
        ReduceByKey("sum", Sum).         // invoke the registered "sum" reducer function.
        Sort("sortBySum", flow.OrderBy(2, true)).
        Top("top5", 5, flow.OrderBy(2, false)).
        Printlnf("%s\t%d")

    if *isDistributed {
        f.Run(distributed.Option())
    } else {
        f.Run()
    }

}

func tokenize(row []interface{}) error {
    line := gio.ToString(row[0])
    for _, s := range strings.FieldsFunc(line, func(r rune) bool {
        return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
    }) {
        gio.Emit(s)
    }
    return nil
}

func appendOne(row []interface{}) error {
    row = append(row, 1)
    gio.Emit(row...)
    return nil
}

func sum(x, y interface{}) (interface{}, error) {
    return gio.ToInt64(x) + gio.ToInt64(y), nil
}

Now you can execute the binary directly or with "-distributed" option to run in distributed mode. The distributed mode would need a simple setup described later.

A bit more blown up example is here, using the predefined mapper or reducer: https://github.com/chrislusf/gleam/blob/master/examples/word_count_in_go/word_count_in_go.go

Word Count by Unix Pipe Tools

Here is another way to do the similar by unix pipe tools.

Unix Pipes are easy for sequential pipes, but limited to fan out, and even more limited to fan in.

With Gleam, fan-in and fan-out parallel pipes become very easy.

package main

import (
    "fmt"

    "github.com/chrislusf/gleam/flow"
    "github.com/chrislusf/gleam/gio"
    "github.com/chrislusf/gleam/gio/mapper"
    "github.com/chrislusf/gleam/plugins/file"
    "github.com/chrislusf/gleam/util"
)

func main() {

    gio.Init()

    flow.New("word count by unix pipes").
        Read(file.Txt("/etc/passwd", 2)).
        Map("tokenize", mapper.Tokenize).
        Pipe("lowercase", "tr 'A-Z' 'a-z'").
        Pipe("sort", "sort").
        Pipe("uniq", "uniq -c").
        OutputRow(func(row *util.Row) error {

            fmt.Printf("%s\n", gio.ToString(row.K[0]))

            return nil
        }).Run()

}

This example used OutputRow() to process the output row directly.

Join two CSV files.

Assume there are file "a.csv" has fields "a1, a2, a3, a4, a5" and file "b.csv" has fields "b1, b2, b3". We want to join the rows where a1 = b2. And the output format should be "a1, a4, b3".

package main

import (
    . "github.com/chrislusf/gleam/flow"
    "github.com/chrislusf/gleam/gio"
    "github.com/chrislusf/gleam/plugins/file"
)

func main() {

    gio.Init()

    f := New("join a.csv and b.csv by a1=b2")
    a := f.Read(file.Csv("a.csv", 1)).Select("select", Field(1,4)) // a1, a4
    b := f.Read(file.Csv("b.csv", 1)).Select("select", Field(2,3)) // b2, b3

    a.Join("joinByKey", b).Printlnf("%s,%s,%s").Run()  // a1, a4, b3

}

Distributed Computing

Setup Gleam Cluster Locally

Start a gleam master and several gleam agents

// start "gleam master" on a server
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"

// start up "gleam agent" on some different servers or ports
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1

Setup Gleam Cluster on Kubernetes

Install Kubernetes tools At the very least you will need a local K8s cluster, Docker & Kubectl. Docker Desktop provides all of this out the box.

Install Skaffold

Choose the appropriate binary here. For example, ARM64:

curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-darwin-arm64 && \
sudo install skaffold /usr/local/bin/

Run Latest Version

cd ./k8s
skaffold run --profile base 

Use skaffold delete --profile base to bring the cluster down.

Alternately Build & Run Local Version

You can build a local copy of gleam for development with hot reloading:

cd ./k8s
skaffold dev --profile dev 

Change Execution Mode.

After the flow is defined, the Run() function can be executed in local mode or distributed mode.

  f := flow.New("")
  ...
  // 1. local mode
  f.Run()

  // 2. distributed mode
  import "github.com/chrislusf/gleam/distributed"
  f.Run(distributed.Option())
  f.Run(distributed.Option().SetMaster("master_ip:45326"))

Important Features

Status

Gleam is just beginning. Here are a few todo items. Welcome any help!

Especially Need Help Now:

Please start to use it and give feedback. Help is needed. Anything is welcome. Small things count: fix documentation, adding a logo, adding docker image, blog about it, share it, etc.

License

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.