miraisolutions / sparkbq

Sparklyr extension package to connect to Google BigQuery
GNU General Public License v3.0
19 stars 3 forks source link

spark_read_bigquery throws java.lang.NullPointerException #16

Open gyorgyg opened 4 years ago

gyorgyg commented 4 years ago

When calling spark_read_bigquery from RStudio server on GCP Dataproc I'm getting the following error:

Error: java.lang.NullPointerException
    at java.io.FileInputStream.<init>(FileInputStream.java:130)
    at java.io.FileInputStream.<init>(FileInputStream.java:93)
    at com.miraisolutions.spark.bigquery.client.BigQueryClient$$anonfun$getBigQueryService$2.apply(BigQueryClient.scala:88)
    at com.miraisolutions.spark.bigquery.client.BigQueryClient$$anonfun$getBigQueryService$2.apply(BigQueryClient.scala:86)
    at scala.Option.fold(Option.scala:158)
    at com.miraisolutions.spark.bigquery.client.BigQueryClient.getBigQueryService(BigQueryClient.scala:86)
    at com.miraisolutions.spark.bigquery.client.BigQueryClient.<init>(BigQueryClient.scala:72)
    at com.miraisolutions.spark.bigquery.DefaultSource$.getBigQueryClient(DefaultSource.scala:95)
    at com.miraisolutions.spark.bigquery.DefaultSource$.com$miraisolutions$spark$bigquery$DefaultSource$$withBigQueryClient(DefaultSource.scala:195)
    at com.miraisolutions.spark.bigquery.DefaultSource.createRelation(DefaultSource.scala:47)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
    at scala.util.control.Breaks.breakable(Breaks.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)

Deployed a Cloud Dataproc image version: 1.3 (Debian 10, Hadoop 2.9, Spark 2.3). This issue is independent of the configured Spark master (the exception is thrown with master = "local" and when using yarn).

This is the code I'm running to setup the Dataproc master node via SSH

sudo apt-get update
sudo apt-get upgrade
sudo apt-get install -y libcurl4-openssl-dev libssl-dev libxml2-dev
$SPARK_HOME/bin/spark-shell --packages miraisolutions:spark-bigquery:0.1.1-s_2.11

and on Rstudio:

### Install packages (run only once)
install.packages("sparklyr")
sparklyr::spark_install(version = "2.3")

### load packages
library(sparklyr)
library(sparkbq)

### set up the necessary environment variables
spark_home_set()
Sys.setenv(HADOOP_CONF_DIR = '/etc/hadoop/conf')
Sys.setenv(YARN_CONF_DIR = '/etc/hadoop/conf')

### Connect to Spark
config <- spark_config()
sc <- spark_connect(master = "local", config = config)

### Read from BigQuery
bigquery_defaults(
  billingProjectId = projectid,
  gcsBucket = bucket_name,
  datasetLocation = "US",
  type = "direct")

hamlet <- 
  spark_read_bigquery(
    sc,
    name = "hamlet",
    billingProjectId = projectid,
    type = "direct",
    gcsBucket = bucket_name,
    projectId = "publicdata",
    datasetId = "samples",
    tableId = "shakespeare",    
)