lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.36k stars 173 forks source link
goka golang kafka microservices processing stream

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

Documentation

This README provides a brief, high level overview of the ideas behind Goka.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings

// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
    goka.WithConsumerGroupBuilder(
        goka.ConsumerGroupBuilderWithConfig(cfg),
    ),
    // ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/lovoo/goka"
    "github.com/lovoo/goka/codec"
)

var (
    brokers             = []string{"localhost:9092"}
    topic   goka.Stream = "example-stream"
    group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
    emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
    if err != nil {
        log.Fatalf("error creating emitter: %v", err)
    }
    defer emitter.Finish()
    for {
        time.Sleep(1 * time.Second)
        err = emitter.EmitSync("some-key", "some-value")
        if err != nil {
            log.Fatalf("error emitting message: %v", err)
        }
    }
}

// process messages until ctrl-c is pressed
func runProcessor() {
    // process callback is invoked for each message delivered from
    // "example-stream" topic.
    cb := func(ctx goka.Context, msg interface{}) {
        var counter int64
        // ctx.Value() gets from the group table the value that is stored for
        // the message's key.
        if val := ctx.Value(); val != nil {
            counter = val.(int64)
        }
        counter++
        // SetValue stores the incremented counter in the group table for in
        // the message's key.
        ctx.SetValue(counter)
        log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
    }

    // Define a new processor group. The group defines all inputs, outputs, and
    // serialization formats. The group-table topic is "example-group-table".
    g := goka.DefineGroup(group,
        goka.Input(topic, new(codec.String), cb),
        goka.Persist(new(codec.Int64)),
    )

    p, err := goka.NewProcessor(brokers, g)
    if err != nil {
        log.Fatalf("error creating processor: %v", err)
    }
    ctx, cancel := context.WithCancel(context.Background())
    done := make(chan bool)
    go func() {
        defer close(done)
        if err = p.Run(ctx); err != nil {
            log.Fatalf("error running processor: %v", err)
        } else {
            log.Printf("Processor shutdown cleanly")
        }
    }()

    wait := make(chan os.Signal, 1)
    signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
    <-wait   // wait for SIGINT/SIGTERM
    cancel() // gracefully stop processor
    <-done
}

func main() {
    go runEmitter() // emits one message every second forever
    runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.