spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.56k stars 513 forks source link

Beam SQL not firing #2177

Closed jayadeep-jayaraman closed 5 years ago

jayadeep-jayaraman commented 5 years ago

Hi,

I am building a simple prototype wherein I am reading data from Pubsub and using BeamSQL, code snippet as below

    val eventStream: SCollection[String] = sc.pubsubSubscription[String]("projects/jayadeep-etl-platform/subscriptions/orders-dataflow")
      .withFixedWindows(Duration.standardSeconds(10))

    val events: SCollection[DemoEvents] = eventStream.applyTransform(ParDo.of(new DoFnExample()))

    events.map(row=>println("Input Stream:" + row))

    val pickup_events = SideOutput[DemoEvents]()
    val delivery_events = SideOutput[DemoEvents]()

    val (mainOutput: SCollection[DemoEvents], sideOutputs: SideOutputCollections)= events
      .withSideOutputs(pickup_events, delivery_events)
      .flatMap {
        case (evts, ctx) =>
          evts.eventType match {
            // Send to side outputs via `SideOutputContext`
            case "pickup" => ctx.output(pickup_events,evts)
            case "delivery" => ctx.output(delivery_events,evts)
          }
          Some(evts)
      }

    val pickup: SCollection[DemoEvents] = sideOutputs(pickup_events)
    val dropoff = sideOutputs(delivery_events)

    pickup.map(row=>println("Pickup:" + row))
    dropoff.map(row=>println("Delivery:" + row))

    val consolidated_view = tsql"select $pickup.order_id as orderId, $pickup.area as pickup_location, $dropoff.area as dropoff_location , $pickup.restaurant_id as resturantId from $pickup as pickup left outer join $dropoff as dropoff ON $pickup.order_id = $dropoff.order_id ".as[Output]

    consolidated_view.map(row => println("Output:" + row))

    sc.run().waitUntilFinish()

    ()

I am using Directrunner for testing it locally and I am able to see the results right until the beam sql is executed. The output from beam sql is not getting printed.

Input Stream:DemoEvents(false,pickup,Bangalore,Indiranagar,1566382242,49457442008,1566382242489,7106576,1566382242000,178258,7406545542,,false,null,htr23e22-329a-4b05-99c1-606a3ccf6a48,972)
Pickup:DemoEvents(false,pickup,Bangalore,Indiranagar,1566382242,49457442008,1566382242489,7106576,1566382242000,178258,7406545542,,false,null,htr23e22-329a-4b05-99c1-606a3ccf6a48,972)
Input Stream:DemoEvents(false,delivery,Bangalore,Indiranagar,1566382242,49457442008,2566382242489,7106576,1566382242000,178258,7406545542,,false,null,htr23e22-329a-4b05-99c1-606a3ccf6a48,972)
Delivery:DemoEvents(false,delivery,Bangalore,Indiranagar,1566382242,49457442008,2566382242489,7106576,1566382242000,178258,7406545542,,false,null,htr23e22-329a-4b05-99c1-606a3ccf6a48,972)
nevillelyh commented 5 years ago

You might have to save to somewhere after the last .map? Otherwise the runner might ignore dummy transforms that don't end up PDone.

jayadeep-jayaraman commented 5 years ago

Thanks @nevillelyh . If i remove the pubsub source and add some dummy data inline then the final output gets printed. Do you think saving it somewhere might be related to a streaming pipeline?

jayadeep-jayaraman commented 5 years ago

Hi @nevillelyh - I added the below code to write to Bigquery but still the job is not finishing and the table is not getting created in BQ.

consolidated_view.map(kv => TableRow("orderId" -> kv.orderId, "pickup_location" -> kv.pickup_location,"dropoff_location"->kv.dropoff_location,"resturantId"->kv.resturantId))
      .saveAsBigQuery("jayadeep-etl-platform:demo.orders", schema, WRITE_APPEND, CREATE_IF_NEEDED)
jayadeep-jayaraman commented 5 years ago

The issue was with Direct Runner, when i changed it to Dataflow runner the issue got resolved.