apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: Issues while reading from Pubsub emulator and using DirectRunner Golang #29741

Open yasser-chihab opened 11 months ago

yasser-chihab commented 11 months ago

What happened?

What happened?

We are exploring the use of GCP Dataflow alongside Apache Beam Go SDK, so while setuping a pipline that processes event from a pubsub subscription with Dataflow as a runner seems working without issues, but when trying to streamline the local development environment with Pubsub emulator and DirectRunner, we face some issues.

package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "strings"
    "time"

    "cloud.google.com/go/pubsub"
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

var gcpProject = "project-id"

type BqRow struct {
    Date      string
    Line      string
    WordCount int
}

func writeToBQ(s beam.Scope, table string, wordCountLine beam.PCollection) {
    bqRows := beam.ParDo(s, func(elem string, count int) BqRow {
        return BqRow{Date: time.Now().String(), Line: elem, WordCount: count}
    }, wordCountLine)
    bigqueryio.Write(s, gcpProject, table, bqRows)
}

func filterEmptyLines(line string, emit func(string)) {
    if len(line) > 0 {
        emit(line)
    }
}

func wordCountLine(line string, emit func(string, int)) {
    trimedLine := strings.TrimSpace(line)
    words := strings.Split(trimedLine, " ")
    emit(line, len(words))
}

func convertPubsubMsg(msg []byte) string {
    return (string)(msg)
}

func main() {
    ctx := context.Background()
    os.Setenv("local", "true")
    if local := os.Getenv("local"); local == "true" {
        flag.Set("runner", "DirectRunner")
        flag.Set("streaming", "true")
    } else {
        flag.Set("runner", "dataflow")
        flag.Set("project", gcpProject)
        flag.Set("region", "region")
        flag.Set("staging_location", "gs://stream-words/binaries")
    }

    pubsubSubId := "streamWord-sub"
    pubsubTopic := "streamWords"
    BQDatasetTable := "project-id:streamWords.stats"

    flag.Parse()
    beam.Init()

    //pubsub client
    pubClient, err := pubsub.NewClient(ctx, "gcp-project-id")
    if err != nil {
        fmt.Print(err)
    }
    defer pubClient.Close()

    // Create the Pipeline object and root scope.
    pipeline, scope := beam.NewPipelineWithRoot()

    // read stream of msg from pubsub
    streamWords := pubsubio.Read(scope, gcpProject, pubsubTopic, &pubsubio.ReadOptions{
        Subscription: pubsubSubId})

    //create fixed windows of 10s
    windowedStream := beam.WindowInto(scope, window.NewFixedWindows(time.Second*10), streamWords)

    // convert pubsub msg to string
    msg := beam.ParDo(scope, convertPubsubMsg, windowedStream)

    //filter empty lines
    nonEmptyLines := beam.ParDo(scope, filterEmptyLines, msg)

    //count words in each line
    wordCountLine := beam.ParDo(scope, wordCountLine, nonEmptyLines)

    // write to BQ
    writeToBQ(scope, table, wordCountLine)
    debug.Print(scope, wordCountLine)

    // Run the beam pipeline
    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

When running the following pipline with the DirectRunner and PubSub emulator is used, the pipline throws the following error:

2023/12/13 09:36:27 Executing pipeline with the direct runner.
2023/12/13 09:36:27 Pipeline:
2023/12/13 09:36:27 Nodes: {1: []uint8/bytes GLO:unbounded}
{2: []uint8/bytes FIX[10s]:unbounded}
{3: string/string FIX[10s]:unbounded}
{4: string/string FIX[10s]:unbounded}
{5: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
{6: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}
Edges: 1: External [] -> [Out: []uint8 -> {1: []uint8/bytes GLO:unbounded}]
2: WindowInto [In(Main): []uint8 <- {1: []uint8/bytes GLO:unbounded}] -> [Out: []uint8 -> {2: []uint8/bytes FIX[10s]:unbounded}]
3: ParDo [In(Main): []uint8 <- {2: []uint8/bytes FIX[10s]:unbounded}] -> [Out: string -> {3: string/string FIX[10s]:unbounded}]
4: ParDo [In(Main): string <- {3: string/string FIX[10s]:unbounded}] -> [Out: string -> {4: string/string FIX[10s]:unbounded}]
5: ParDo [In(Main): string <- {4: string/string FIX[10s]:unbounded}] -> [Out: KV<string,int> -> {5: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}]
6: ParDo [In(Main): KV<X,Y> <- {5: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}] -> [Out: KV<X,Y> -> {6: KV<string,int>/KV<string,int[varintz]> FIX[10s]:unbounded}]
2023/12/13 09:36:27 Failed to execute job: translation failed
        caused by:
no root units
exit status 1

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

yasser-chihab commented 11 months ago

It is mentionned here https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio that pubsubio supports only Dataflow as a runner, any workaround to that situation 🤔