typelevel / doobie

Functional JDBC layer for Scala.
MIT License
2.17k stars 359 forks source link

aws Athena JDBC driver #525

Closed netanelrabinowitz closed 7 years ago

netanelrabinowitz commented 7 years ago

Is there a way to query aws athena using doobie ?

wedens commented 7 years ago

Yes, If it has a JDBC driver. You can use any JDBC driver with doobie. Just configure Transactor to use it.

Note that there can be some quirks in driver implementation (e.g. https://github.com/tpolecat/doobie/issues/339).

tpolecat commented 7 years ago

Yeah people use doobie with weird drivers and it works fine, but you sometimes need a reconfigured transactor because the first thing to disappear in these cloud databases is transactions.

tpolecat commented 7 years ago

Let us know what happens and we can help you get it working.

netanelrabinowitz commented 7 years ago

Thanks @tpolecat !! I created the following transactor:

import java.sql.Connection

import cats.Monad
import cats.implicits._
import doobie.free.connection.ConnectionIO
import doobie.free.drivermanager.{DriverManagerIO, delay, getConnection}
import doobie.util.transactor.Transactor
import fs2.util.{Catchable, Suspendable}

abstract class AthenaTransactor[M[_] : Catchable : Suspendable] extends Transactor[M] {

  private val noop = Monad[ConnectionIO].pure(())

  override def before = noop

  override def oops = noop

  override def after = noop
}

object AthenaTransactor {
  def create[M[_] : Catchable : Suspendable](driver: String, conn: DriverManagerIO[Connection]): Transactor[M] =
    new AthenaTransactor[M] {
      val connect: M[Connection] =
        (delay(Class.forName(driver)) *> conn).trans[M]
    }

  def apply[M[_] : Catchable : Suspendable](driver: String, url: String, info: java.util.Properties): Transactor[M] =
    create(driver, getConnection(url, info))
}

tried to run a simple query:

    sql"select eventid from nglogs.clicks limit 10"
      .query[String]
      .list
      .transact(xa1)
      .unsafePerformIO
      .take(5)
      .foreach(println)

and got the following exception:

Exception in thread "main" com.amazonaws.athena.jdbc.NotImplementedException: Method Connection.prepareStatement is not yet implemented
    at com.amazonaws.athena.jdbc.AthenaConnection.prepareStatement(AthenaConnection.java:114)
    at doobie.free.connection$ConnectionOp$PrepareStatement.$anonfun$defaultTransK$35(connection.scala:226)
    at doobie.free.connection$ConnectionOp.$anonfun$primitive$2(connection.scala:89)
    at fs2.util.Suspendable.$anonfun$delay$1(Suspendable.scala:22)
    at doobie.util.iolite$IOInstances0$$anon$2.$anonfun$suspend$1(iolite.scala:111)
    at doobie.util.iolite$IOLite$$anon$3$$anon$4.$anonfun$start$1(iolite.scala:28)
    at doobie.util.iolite$IOLite$Compute.loop$1(iolite.scala:75)
    at doobie.util.iolite$IOLite$Compute.unsafePerformIO(iolite.scala:86)
    at doobie.util.iolite$IOLite.$anonfun$attempt$1(iolite.scala:41)
    at doobie.util.iolite$IOLite$Primitive.unsafePerformIO(iolite.scala:59)
    at doobie.util.iolite$IOLite$Compute.loop$1(iolite.scala:78)
    at doobie.util.iolite$IOLite$Compute.unsafePerformIO(iolite.scala:86)
    at doobie.util.iolite$IOLite.$anonfun$attempt$1(iolite.scala:41)
    at doobie.util.iolite$IOLite$Primitive.unsafePerformIO(iolite.scala:59)
    at doobie.util.iolite$IOLite$Compute.loop$1(iolite.scala:78)
    at doobie.util.iolite$IOLite$Compute.unsafePerformIO(iolite.scala:86)
    at Athena$.main(Playground.scala:64)
    at Athena.main(Playground.scala)

Seems like the same problem with presto.. #339 On that issue, you asked for an example that works with raw JDBC so you can show how to do it with doobie.. so let's take that query as an example so I can try to take it from there?


  Class.forName("com.amazonaws.athena.jdbc.AthenaDriver")
  val connection: Connection = DriverManager.getConnection("jdbc:awsathena://athena.us-east-1.amazonaws.com:443/", info)
    val statement = connection.createStatement
    val sql = "select eventid from nglogs.clicks limit 10"
    val rs = statement.executeQuery(sql)
    }
wedens commented 7 years ago

You need to use a bit lower level API

HC.createStatement(HS.executeQuery("select eventid from nglogs.clicks limit 10")(HRS.list[String])).transact(xa)
tpolecat commented 7 years ago

Yeah, you can't use the sql interpolators unless you have prepared statements. It looks like Athena's driver is extremely minimal.

netanelrabinowitz commented 7 years ago

Thanks a lot!

calvinlfer commented 9 months ago

Hello, I just wanted to report that this works great with 1.0.0-RC5 and even with Hikari using the Athena JDBC 3.x driver:

Here is the code

object AthenaTransactor:
  val transactor: Resource[IO, Transactor[IO]] = {
    val athenaDataSource: IO[AthenaDataSource] = IO.delay {
      // https://docs.aws.amazon.com/athena/latest/ug/jdbc-v3-driver-getting-started.html#jdbc-v3-driver-configuring-the-driver
      val base = new AthenaDataSource()
      base.setWorkGroup("primary")
      base.setCredentialsProvider("DefaultChain")
      base.setOutputLocation("s3://satcal-data-lake/athena-output/")
      base.setRegion("us-east-1")
      base.setDatabase("event-lake")
      base
    }

    def hikariConfig(athenaDataSource: AthenaDataSource): IO[HikariConfig] = IO.delay {
      val base = new HikariConfig()
      base.setMinimumIdle(1)
      base.setMaximumPoolSize(4)
      base.setDataSource(athenaDataSource)
      base.setAutoCommit(true)
      base
    }

    for
      athenaDataSource <- Resource.eval(athenaDataSource)
      config           <- Resource.eval(hikariConfig(athenaDataSource))
      originalXa       <- HikariTransactor.fromHikariConfig[IO](config)
      xa = Transactor.strategy.set(
             originalXa,
             Strategy(
               before = FC.unit,
               after = FC.unit,
               oops = FC.unit,
               always = FC.setAutoCommit(true)
             )
           )
    yield xa
  }

This allows you to use the high level API as well which is super nice:

AthenaTransactor.transactor.use { xa =>
  for
    result <- sql"SELECT event_value FROM example_table_iceberg LIMIT 100"
                .query[String]
                .to[List]
                .transact(xa)

    _ <- result.traverse_(Console[IO].println(_))
  yield ExitCode.Success
}

You can also use prepared statements as well 😺

    for
      xa       <- AthenaTransactor.transactor
      userLimit = 10
      result <- sql"SELECT event_value FROM login_success_athena_iceberg LIMIT $userLimit"
                  .query[String]
                  .to[List]
                  .transact(xa)
    yield ()