lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

Question: Trying to find the best way to join multiple topics #463

Open Quinoa3 opened 1 week ago

Quinoa3 commented 1 week ago

I have three topics of distinct messages all with the same key of event id. The goal is to aggregate the messages and when they all arrive to emit a combined message onto a new stream. Currently I have groups and the process Callback defined as follows. All three are similar.

func processPageView(ctx goka.Context, msg interface{}) {
    var pageView *message.PageViewMessage
    pageView = msg.(*message.PageViewMessage)
    baseEventValue := ctx.Join(baseEventTable)
    productInfoValue := ctx.Join(productInfoTable)
    if baseEventValue != nil && productInfoValue != nil {
        fullSignal := &message.FullSignal{}
        fullSignal.ProductInfoMessage = productInfoValue.(*message.ProductInfoMessage)
        fullSignal.BaseEventMessage = baseEventValue.(*message.BaseEventMessage)
        fullSignal.PageViewMessage = pageView
        ctx.Emit(fullSignalStream, ctx.Key(), fullSignal)
    } else {
        ctx.SetValue(pageView)
    }
}

func definePageViewGroup() *goka.GroupGraph {
    group := goka.DefineGroup(baseEventGroup,
        goka.Input(pageViewStream, new(codec.PageViewMessageCodec), processPageView),
        goka.Output(fullSignalStream, new(codec.FullSignalCodec)),
        goka.Join(baseEventTable, new(codec.BaseEventMessageCodec)),
        goka.Join(productInfoTable, new(codec.ProductInfoMessageCodec)),
        goka.Persist(new(codec.PageViewMessageCodec)),
    )
    return group
}

Is there a better way to do this? My last issues is if one of the messages doesn't arrive how can I still emit a message with the data that does exist? Thanks

Quinoa3 commented 6 days ago

Also what is the best way to delete keys and values from other tables/groups?