YotpoLtd / metorikku

A simplified, lightweight ETL Framework based on Apache Spark
https://yotpoltd.github.io/metorikku/
MIT License
583 stars 155 forks source link

using jdbc input to load oracle datasource will throw NPE #498

Open superzwl opened 1 year ago

superzwl commented 1 year ago

Hi , I construct a ETL job which reads data from Oracle and writes data to Oracle. When i use metorikku version after v0.0.146, it will throw NPE

Oracle version: 12c
Oracle jar: ojdbc8.jar
metorikku version:0.0.151

The job.yaml jdbc config:

inputs:
  inpuacle_1661762624454:
    jdbc:
      password: xxxx
      options:
        driver: oracle.jdbc.driver.OracleDriver
      connectionUrl: "jdbc:oracle:thin:@//xxxx:1521/xxx"
      user: xxx
      table: TEST1

When executing this job, some exceptions occurs:

Exception in thread "main" java.lang.NullPointerException
        at java.util.Hashtable.put(Hashtable.java:460)
        at java.util.Properties.setProperty(Properties.java:166)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$1.apply(JDBCOptions.scala:48)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$1.apply(JDBCOptions.scala:48)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:48)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.yotpo.metorikku.input.readers.jdbc.JDBCInput.read(JDBCInput.scala:52)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:75)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:73)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:73)
        at com.yotpo.metorikku.Job.<init>(Job.scala:52)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:14)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

when i use version v0.0.146, it works fine