chrislusf / gleam

Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.
Apache License 2.0
3.46k stars 290 forks source link

Some ideas in windows functions #110

Open qazqwe1596357 opened 6 years ago

qazqwe1596357 commented 6 years ago

Some ideas in windows functions

I aim to do some developing on "windows functions" as abolve. Is that met the idea of yours? Or is there any conflict with your plan? Or some matters needing attention?

qazqwe1596357 commented 6 years ago

can I join the gleam-dev on Slack ?

chrislusf commented 6 years ago

If you plan to do the window functions, what are you thinkings for the underlying data structures?

need your email to add you to slack.

qazqwe1596357 commented 6 years ago

I will append some key data structure later.

qazqwe1596357 commented 6 years ago

Key idea for windowing functions

1-Select Window field realization And Use in user application

// Define for selectWindowField like mapper
func selectWindowField(row []interface{}) int64 {
    timestampField := row[2].(int64)
    windowField := timestampField % 10  // we use timestampField in row generate the windowField
    return windowField
}

f := flow.New("kafkaSource").
          Read(kfkSource).
          Map("decoder", DecoderMapper_ID).
          // after SelectWindowField, the windowField will be append into Row
          SelectWindowField("Select-Window-Field", selectWindowField_ID).
          Top("top2", 5, flow.OrderBy(2, false)).
          // after AppendWindowFieldAt, windowField will be seen at Row.V[2]
          AppendWindowFieldAt(3).
          Printlnf("%s,%d,%d")

/*
When inputs like
    a,1,,1516513411
    b,2,,1516513412
    c,3,,1516513413
    d,4,,1516513514
    e,5,,1516513515
    f,6,,1516513515
Then output would like
    c,3,1516513410
    b,2,1516513410
    f,6,1516513510
    e,5,1516513510

In this application windowing field is "windowField := timestampField % 10", each 10 seconds is one window
- How to determin how windowing split is in SelectWindowField function
- If we care the windowing field, just call AppendWindowFieldAt, to insert/append it in Row
*/

2-Row realization for windowing

Row struct realization

Row current in gleam
type Row struct {
    K []interface{} `msg:"K"`
    V []interface{} `msg:"V"`
    T int64         `msg:"T"`
}
Row in windowing calculation
//
// Plan A
//

// one way is append window releated fields on Row
type Row struct {
    K []interface{} `msg:"K"`
    V []interface{} `msg:"V"`
    W int64         `msg:"W"`  // for window id, most can be timestamp
    S int64         `msg:"S"`  // for status Row
}

// in window Mode, not only data Row can be sent by the Row
// but also control command can be.
// "control command can be" like send EOF status, or Window Batch errors

// ================================================================

//
// Plan B
//

// the other way of thinking Row struct: no other fields append to struct
type Row struct {
    K []interface{} `msg:"K"`
    V []interface{} `msg:"V"`
}

// but we need to transmit command and window info, how we can?
// that can be something like
//     Row.V []interface{} = []interface{S, W, Row.V ... }
//     which we put (Status for Command use), (Window Field) into Row.V first two fields
//     the other true Values append at Row.V tail

Row Read / Write

Read/Write Row By RowReader/RowWriter

RowReader realization

//
// RowReader in Batch Mode
//
type RawRowReader struct {
    reader io.Reader
}

func (this *RawRowReader) ReadRow() (*util.Row, error) {
    return util.ReadRow(this.reader)
}

//
// RowReader in Windowing Mode
//
type WinRowReader struct {
    rowChan chan *util.Row
}

func (this *WinRowReader) AddRow(row *util.Row) error {
    this.rowChan <-row
}

func (this *WinRowReader) ReadRow() (*util.Row, error) {
    select {
        case row := <- this.rowChan:
            select row.S {
            case: ErrorOK
                // if Use Plan B Row struct, before return, should eat the head (S, W) fields in row.V
                return row, nil
            case: ErrorEOF
                return nil, ErrorEOF
            ...
            }
    }
    return util.ReadRow(this.reader)
}

3-Router realization

Router description

Router realization

//
// Router in windowing Mode
//
func (r *localDriver) runTask(wg *sync.WaitGroup, task *Task) {
    step := task.Step
    if step.Function != nil {
        if step.winMode {
            // use RowRouter manage windowing Function call
            rowRouter := RowRouter{readers: task.InputChans, writers: task.OutputShards, Function: step.Function}
            rowRouter.Run()
            return
        } else {
            // call Function once
            rawRowReaders := []RawRowReader{}
            rawRowWriters := []RawRowWriters{}
            // init rawRowReaders / rawRowWriters
            ...
            step.Function(rawRowReaders, rawRowWriters, stat)
        }
    }
}

// WinTask
type WinTask struct {
    winRowReaders     []*WinRowReader
    winRowWriters     []*WinRowWriters
    Function          func([]RowReader, []RowWriter, *pb.InstructionStat) error
    timeBegin         time.Time
    timeTimeout       time.Time
    winFieldBind      int64
    instructionStat   *pb.InstructionStat
}

func (this *WinTask) RunFunc(row *Row) error {
    this.Function(this.winRowReaders, this.winRowWriters, this.instructionStat)
    // when WinTask.Function done, send EOF Row
    row := &Row{S: ErrorEOF}
    for _, w := range this.winRowWriters {
        w.WriteRow(row)
    }
}

// RowRouter: only base logic, should add timeout control and error procs and more
type RowRouter struct {
    readers     []io.Reader
    writers     []io.Writer
    Function    func([]RowReader, []RowWriter, *pb.InstructionStat) error
    winTaskReg  map[int64]*WinTask
}

func (this *RowRouter) Run() {
    for i, r := range this.readers {
        winRowReader := &WinRowReader{}
        go func() {
            rawRowReader := &RawRowReader{reader: r}
            for row, err := rawRowReader.ReadRow; err != nil {
                winTask := this.GetOrGenWinTask(row.W)
                winTask.winRowReaders[i].AddRow(row)
            }
        }
    }
}

func (this *RowRouter) GetOrGenWinTask(winField int64) (*WinTask, error) {
    return this.winTaskReg[winField], nil
}
chrislusf commented 6 years ago

The API changes have some good ideas, but need more thoughts on

maybe check Apache Beam/Flink/Google Data Flow.

qazqwe1596357 commented 6 years ago

ideas for the problems mentioned above

binyoucai commented 3 years ago

666

feiyangbeyond commented 11 months ago

Is windows supported now?