forcedotcom / phoenix

BSD 3-Clause "New" or "Revised" License
558 stars 227 forks source link

Lack of Batch Statement implementation break storage with PIG #602

Open damiencarol opened 10 years ago

damiencarol commented 10 years ago

If you use PHOENIX with Pig.

Try to UPSERT some values from PIG script to PHOENIX with JDBC connector throw java.sql.SQLFeatureNotSupportedException.

This is because PIG use Batch statement (see class "org.apache.pig.piggybank.storage.DBStorage").

    STORE some_table INTO 'some_table' USING org.apache.pig.piggybank.storage.DBStorage('com.salesforce.phoenix.jdbc.PhoenixDriver',
    'jdbc:phoenix:someserver', '', '',
    ' UPSERT INTO some_table (id, col1, col2, col3) VALUES (?, ?, ?, ?) ');

Exception :

java.io.IOException: java.lang.RuntimeException: JDBC error
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:469)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:432)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:404)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:256)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.RuntimeException: JDBC error
    at org.apache.pig.piggybank.storage.DBStorage.putNext(DBStorage.java:178)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:139)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:98)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:467)
    ... 11 more
Caused by: java.sql.SQLFeatureNotSupportedException
    at com.salesforce.phoenix.jdbc.PhoenixPreparedStatement.addBatch(PhoenixPreparedStatement.java:92)
    at org.apache.pig.piggybank.storage.DBStorage.putNext(DBStorage.java:157)
    ... 16 more
jtaylor-sfdc commented 10 years ago

To support this, there are three methods you need to implement in com.salesforce.phoenix.jdbc.PhoenixPreparedStatement:

  1. public void addBatch(String sql) throws SQLException. This would just add to a List<String>
  2. public void clearBatch() throws SQLException. This would just clear the list.
  3. public int[] executeBatch() throws SQLException. This would walk through the List<String> and call execute(String) on each element. See javadoc on how to populate the int[] that is returned. Use getUpdateCount() after each execute to get the number of rows updated.
  4. Update PhoenixDatabaseMetaData.supportsBatchUpdates() to return true.

This will just cause each Statement to be executed in turn. Since Phoenix is an embedded driver, there's no performance penalty for implementing in this manner, as batching is not saving any network traffic.

jtaylor-sfdc commented 10 years ago

Interested in contributing this, @damiencarol ?

damiencarol commented 10 years ago

Yep! Of course !

Wait my patch. @jtaylor-sfdc

damiencarol commented 10 years ago

@jtaylor-sfdc here my first pull request