chrislusf / glow

Glow is an easy-to-use distributed computation system written in Go, similar to Hadoop Map Reduce, Spark, Flink, Storm, etc. I am also working on another similar pure Go system, https://github.com/chrislusf/gleam , which is more flexible and more performant.
3.2k stars 248 forks source link

centos6.4 word count in Distributed Mode run error #35

Closed adeagle closed 8 years ago

adeagle commented 8 years ago

////////////////////////////Distributed Mode Conf//////////////////////////// ./glow master ./glow agent --dir data --max.executors=16 --memory=2048 --master="localhost:8930" --port 8931 ./glow agent --dir data1 --max.executors=16 --memory=2048 --master="localhost:8930" --port 8932 go run word_count.go -glow -glow.leader="localhost:8930" -glow.related.files="passwd"

///////////////////////////////word_count.go///////////////////////////////////////// `package main

import ( "flag" "fmt" "strings" "strconv" "time" "sync" "encoding/gob" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" )

type WordCountResult struct { Addr string Info MemInfo }

type MemInfo struct { Addr string Size int Count int }

func init() { gob.Register(MemInfo{}) }

func goStart(wg *sync.WaitGroup, fn func()) { wg.Add(1) go func() { defer wg.Done() fn() }() }

func testWordCount1() {

println("testWordCount1")
flowOut1 := make(chan WordCountResult)  
f1       := flow.New()  
f1.TextFile(
    "passwd", 2,
).Map(func(line string, ch chan MemInfo) {
    words:=strings.Split(line, ":")
    if s, err := strconv.ParseInt(words[1], 16, 0); err == nil {
        ch <- MemInfo{words[0], int(s), 1}
    }       
}).Map(func(ws MemInfo) (string, MemInfo) {
    return ws.Addr, ws
}).ReduceByKey(func(x MemInfo, y MemInfo) (MemInfo) {
    return MemInfo{x.Addr,x.Size+y.Size,x.Count+y.Count}
}).AddOutput(flowOut1)

flow.Ready()

startTime := time.Now().UnixNano()
var wg sync.WaitGroup
goStart(&wg, func() {
    f1.Run()
})

goStart(&wg, func() {
    for t := range flowOut1 {
        fmt.Printf("%s size:%-8d count:%-8d\n",
            t.Info.Addr,t.Info.Size,t.Info.Count)
    }
})  

wg.Wait()

endTime := time.Now().UnixNano()
fmt.Printf("UseTime:%d\n",(endTime - startTime) / 1000000)

}

func main() { flag.Parse() testWordCount1() }`

/////////////////////////////////console output/////////////////////// [::1]:8931>2016/02/28 10:34:06 receive error:read tcp [::1]:46225->[::1]:8931: read: connection reset by peer

/////////////////////////////////passwd/////////////////////////////////// 0x001b8aa0:00000012 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000012 0x001b8aa0:00000400 0x001b8aa0:00000096 0x001b8aa0:00000064 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000008 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000016 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000016 0x001b8aa0:00000021 0x76fb9640:00000008 0x001b8aa0:00000020 0x001b8aa0:00000020 0x001b8aa0:00000020 0x001b8aa0:00000032 0x001b8aa0:00000016 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8ab8:00000512 0x76fb9640:00000008 0x76fb9640:00000008 0x76fb9640:00000008 0x76fb9640:00000008 0x7688b540:00000057 0x76fb9640:00000008 .......

chrislusf commented 8 years ago

runs fine on me. seems related to master or agent setup

chrislu-mbp:tmp chrislu$ go run wc.go -glow -glow.related.files=passwd
testWordCount1
2016/02/27 20:44:34 localhost:8930 allocated 1 executors.
2016/02/27 20:44:34 localhost:8930 allocated 1 executors.
2016/02/27 20:44:34 localhost:8930 allocated 1 executors.
2016/02/27 20:44:34 localhost:8930 allocated 1 executors.
[::1]:8931>testWordCount1
[::1]:8932>testWordCount1
[::1]:8932>testWordCount1
[::1]:8932>testWordCount1
0x001b8aa0 size:1813     count:24
0x001b8ab8 size:1298     count:1
0x7688b540 size:87       count:1
0x76fb9640 size:48       count:6
[::1]:8932>
[::1]:8932>
[::1]:8932>
[::1]:8932>
[::1]:8931>
UseTime:91
chrislusf commented 8 years ago

Does it always happen or just one time error?

If always, please check for any firewall settings.

adeagle commented 8 years ago

word_count.zip

chrislusf commented 8 years ago

no issue using the zip file.

chrislu-mbp:word_count chrislu$ go run wc.go -glow -glow.related.files=passwd
testWordCount1
2016/02/28 00:59:28 localhost:8930 allocated 1 executors.
2016/02/28 00:59:28 localhost:8930 allocated 2 executors.
2016/02/28 00:59:28 localhost:8930 allocated 3 executors.
2016/02/28 00:59:28 localhost:8930 allocated 1 executors.
[::1]:8931>testWordCount1
[::1]:8932>testWordCount1
[::1]:8931>testWordCount1
[::1]:8931>testWordCount1
[::1]:8931>testWordCount1
[::1]:8931>testWordCount1
[::1]:8931>testWordCount1
0x001b8aa0 size:53821    count:2767
0x001b8ab8 size:1298     count:1
0x7688af58 size:23       count:1
0x7688b540 size:35156    count:4
0x76fb9640 size:48       count:6
[::1]:8932>
[::1]:8932>
[::1]:8932>
[::1]:8932>
[::1]:8932>
[::1]:8931>
[::1]:8931>
[::1]:8931>
[::1]:8931>
[::1]:8931>
[::1]:8931>
UseTime:224
chrislusf commented 8 years ago

wait. seems I can reproduce sometimes on linux...

adeagle commented 8 years ago

/////////////////////////////windows test script////////////////////////////

taskkill /im glow.exe del data* -r

start glow.exe master ping 127.0.0.1 -n 2 > null start glow.exe agent --dir data --max.executors=16 --memory=2048 --master="localhost:8930" --port 8931 ping 127.0.0.1 -n 2 > null start glow.exe agent --dir data1 --max.executors=16 --memory=2048 --master="localhost:8930" --port 8932 ping 127.0.0.1 -n 2 > null

for /l %%i in (1,1,10) do go run wc.go -glow -glow.related.files=passwd

taskkill /im glow.exe del data* -r

///////////////////////////////console output///////////////////////////////// C:\GoWorkspace\word_count>go run wc.go -glow -glow.related.files=passwd testWordCount1 2016/02/28 17:46:42 localhost:8930 allocated 1 executors. 2016/02/28 17:46:42 localhost:8930 allocated 1 executors. 2016/02/28 17:46:42 localhost:8930 allocated 4 executors. 2016/02/28 17:46:42 localhost:8930 allocated 1 executors.

bloking....... (5 wc.exe process with high cpu)

chrislusf commented 8 years ago

fixed in https://github.com/chrislusf/glow/commit/3876d2ffa56f022bafec39f1f3e3ee850a0f4dc7