spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.56k stars 513 forks source link

Snowflake Support #5500

Open turb opened 1 month ago

turb commented 1 month ago

Hello here,

Apache Beam has Snowflake support, so it's possible to use it with:

  import import org.apache.beam.sdk.io.snowflake.SnowflakeIO

   val thingMapper = new SnowflakeIO.CsvMapper[Thing] {
       override def mapRow(parts: Array[String]): Thing = Thing(Typed1(parts(0)), Typed2(parts(1)), ...)
    }

    val datasource = SnowflakeIO.DataSourceConfiguration.create()
      .withUsernamePasswordAuth(
        args("snowflake-user"), args("snowflake-password"))
      .withServerName("redacted.snowflakecomputing.com")
      .withDatabase("redacted_db")
      .withRole("redacted_role")
      .withWarehouse("redacted_warehouse")
      .withSchema("redacted_schema")

    val read = SnowflakeIO.read()
      .withDataSourceConfiguration(datasource)
      .fromQuery("SELECT COLUM1, COLUMN2... FROM THINGS")
      .withStagingBucketName("gs://paths") // here for Google Cloud
      .withStorageIntegrationName("redacted_integration")
      .withCsvMapper(thingMapper)
      .withCoder(CoderMaterializer.beam(sc, Thing.coder))

    val things: SCollection[Thing] = sc.customInput("things", read)

However a proper scio integration would be great. I suppose derivating Thing to SnowflakeIO.CsvMapper would need some first a PR in magnolify?

I can work on it.

RustedBones commented 1 month ago

For CSV, we've actually relied on the katan library. It looks to be possible to implement the IO simply with a RowDecoder.

We'd be very happy to see a new contribution from your side!

turb commented 1 month ago

For CSV, we've actually relied on the katan library. It looks to be possible to implement the IO simply with a RowDecoder.

The parser is implemented on the Beam side, using opencsv: only the downstream mapper can be specified. So it would need a PR on Beam to allow to specify another parser.

RustedBones commented 1 month ago

I meant to leverage the decoding part of katan, with smth like

   val thingMapper = new SnowflakeIO.CsvMapper[Thing] {
       override def mapRow(parts: Array[String]): Thing =  implicitly[RowDecoder[Thing]].unsafeDecode(parts.toSeq)
    }
turb commented 1 month ago

@RustedBones opened #5502