CODAIT / spark-netezza

Netezza Connector for Apache Spark
Apache License 2.0
13 stars 7 forks source link

Fetch data from Netezza using Query. #8

Open sureshthalamati opened 8 years ago

sureshthalamati commented 8 years ago

Add support for fetching data from Netezza using query.

Example ; val opts = Map("url" -> testURL, "query" -> "select cust.id as id, cust.name ordrs.itemname from orders ordrs, customers cust where orders.id=customers.id", "partitioncol" -> "id" , "numPartitions" -> "10"

val testDf = sqlContext.read.format("com.ibm.spark.netezza").options(opts).load()

gowtham-kanagodu commented 7 years ago

we can do like below to use a query.

val opts = Map("url" -> testURL, "dbtable" -> "(select cust.id as id, cust.name ordrs.itemname from orders ordrs, customers cust where orders.id=customers.id) X", "partitioncol" -> "id" , "numPartitions" -> "10"

by placing the sql in brackets and giving it an alias name, it will work. but when i try to do a distinct or any operation other than count, its giving below error. the user id that i am using to connect has create external table privileges. any suggestions?

Caused by: java.lang.RuntimeException: Error creating external table pipe:org.netezza.error.NzSQLException: ERROR: Attribute 'DATASLICEID' not found

    at com.ibm.spark.netezza.NetezzaDataReader.getNextLine(NetezzaDataReader.scala:149)
    at com.ibm.spark.netezza.NetezzaDataReader.hasNext(NetezzaDataReader.scala:124)
    at com.ibm.spark.netezza.NetezzaRDD$$anon$1.getNext(NetezzaRDD.scala:76)
    at com.ibm.spark.netezza.NetezzaRDD$$anon$1.hasNext(NetezzaRDD.scala:106)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.sca