samelamin / spark-bigquery

Google BigQuery support for Spark, Structured Streaming, SQL, and DataFrames with easy Databricks integration.
Apache License 2.0
70 stars 28 forks source link

Usage with RStudio's sparklyr #46

Closed martinstuder closed 6 years ago

martinstuder commented 6 years ago

I attempted to use your library with RStudio's sparklyr package. sparklyr's spark_read_source relies on availability of a DefaultSource which implements org.apache.spark.sql.sources.RelationProvider. In a fork of your library I have extended DefaultSource as follows (short version):

class DefaultSource extends .... with RelationProvider {
  ...

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    val otherSqlName = sqlContext
    new BaseRelation {
      override def schema: StructType = getConvertedSchema(sqlContext, parameters)
      override def sqlContext: SQLContext = otherSqlName
    }
  }
}

The corresponding R/sparklyr code is as follows (using a locally published version of your library):

library(sparklyr)

# https://developers.google.com/identity/protocols/application-default-credentials
Sys.setenv("GOOGLE_APPLICATION_CREDENTIALS" = "credentials.json")

config <- spark_config()
# Modified version in local ivy cache
config[["sparklyr.defaultPackages"]] <- c("com.github.samelamin:spark-bigquery_2.11:0.2.2")

sc <- spark_connect(master = "local", config = config)
my_table <-
  spark_read_source(
    sc,
    name = "mytable",
    source = "com.samelamin.spark.bigquery",
    options = list(
      "tableReferenceSource" = "my-project:my_dataset.my_table",
    )
  )

When trying to execute the above I get the following exception:

Error: java.lang.AssertionError: assertion failed: No plan for Relation[name#10,latitude#11,longitude#12,bq_load_timestamp#13] com.samelamin.spark.bigquery.DefaultSource$$anon$1@429b3f9f

    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
    at org.apache.spark.sql.internal.CatalogImpl.cacheTable(CatalogImpl.scala:419)
    at org.apache.spark.sql.execution.command.CacheTableCommand.run(cache.scala:41)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:182)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke$.invoke(invoke.scala:102)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
    at sparklyr.StreamHandler$.read(stream.scala:62)
    at sparklyr.BackendHandler.channelRead0(handler.scala:52)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:748)

It seems to successfully connect and determine the schema but then fails with "No plan for Relation". Any idea what could be causing this?

samelamin commented 6 years ago

Hey I am not the strongest with R so I cant help you there, but looking at the stack trace it seems its a spark error rather than a specific problem with this connector. I will close for now as I cant help

sorry for that

pcejrowski commented 6 years ago

@martinstuder How do you pass configuration? I've tried:

config <- spark_config()
config$`fs.gs.project.id` <- "my-project"

but it does not work.

martinstuder commented 6 years ago

@pcejrowski

You will need to pass them as Spark hadoop configuration properties, i.e.

config$`spark.hadoop.fs.gs.project.id` <- "my-project"

In case you are interested you might also want to have a look at https://github.com/miraisolutions/sparkbq

pcejrowski commented 6 years ago

Awesome! Many thanks @martinstuder