springml / spark-salesforce

Spark data source for Salesforce
Apache License 2.0
80 stars 67 forks source link

Unable to write dataframe into salesforce object. ERROR: Can not instantiate Stax reader for XML source type class org.codehaus.stax2.io.Stax2ByteArraySource #77

Open dinesh1393 opened 1 year ago

dinesh1393 commented 1 year ago

Hi Team,

I am trying to write the spark dataframe into salesforce object. But its failing with the below error. java.lang.IllegalArgumentException: Can not instantiate Stax reader for XML source type class org.codehaus.stax2.io.Stax2ByteArraySource (unrecognized type)

I am able to read the data. But the write operation is failing. Not sure, I need to add any additional dependency or not.

Dependencies used: spark-salesforce_2.12-1.1.4.jar force-partner-api-40.0.0.jar force-wsc-40.0.0.jar jackson-dataformat-xml-2.4.4.jar java-sizeof_2.11-0.1.jar mockito-core-2.0.31-beta.jar salesforce-wave-api-1.0.10.jar scala-library-2.12.10.jar woodstox-core-asl-4.4.0.jar stax2-api-4.2.1

Pyspark Version : 3.3.0

Below is my code.

spark = SparkSession.builder.appName("salesforce_connect").master("local[*]").getOrCreate()
sc    = spark.sparkContext
sc.setLogLevel("Error")
soql = "SELECT name, industry, type, billingaddress, sic, Phone, YearStarted FROM Account"
df = spark.read.format("com.springml.spark.salesforce") \
                 .option("username", "dinesh123@force.com") \
                 .option("password", "password-securityToken") \
                 .option("soql", soql) \
                 .option("inferSchema", "true") \
                 .load()
df.show()

#Transformation
df = df.withColumn("YearStarted|", F.lit("2020"))\
        .na.replace("null", "Agriculture", "Industry")\
        .na.replace("null", "Default", "Type")
df.show()

df.write.format("com.springml.spark.salesforce") \
    .option("username", "dinesh123@force.com") \
    .option("password", "password-securityToken") \
    .option("sfObject", "Account") \
    .option("login", "https://sect-d3-dev-ed.develop.my.salesforce.com/")\
    .option("bulk", True)\
    .option("upsert", True)\
    .save()

`

Below is the Error:

py4j.protocol.Py4JJavaError: An error occurred while calling o57.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 120) (KL-ICF564fDSR.SECT.CORP.SECT.IN executor driver): java.lang.IllegalArgumentException: Can not instantiate Stax reader for XML source type class org.codehaus.stax2.io.Stax2ByteArraySource (unrecognized type) at org.apache.hadoop.shaded.com.ctc.wstx.stax.WstxInputFactory.createSR(WstxInputFactory.java:802) at org.apache.hadoop.shaded.com.ctc.wstx.stax.WstxInputFactory.createXMLStreamReader(WstxInputFactory.java:358) at com.fasterxml.jackson.dataformat.xml.XmlFactory._createParser(XmlFactory.java:580) at com.fasterxml.jackson.dataformat.xml.XmlFactory._createParser(XmlFactory.java:28) at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1122) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3690) at com.springml.salesforce.wave.impl.BulkAPIImpl.addBatch(BulkAPIImpl.java:76) at com.springml.spark.salesforce.SFObjectWriter.$anonfun$writeData$3(SFObjectWriter.scala:45) at com.springml.spark.salesforce.SFObjectWriter.$anonfun$writeData$3$adapted(SFObjectWriter.scala:39) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

SKinserLoanpal commented 1 year ago

this is a spark 3.3.0 compatibility issue, if possible run this on spark 3.0.0, I resolved this issue by building a custom version of the jar where com.fasterxml.jackson.dataformat was shaded+relocated

dinesh1393 commented 1 year ago

@SKinserLoanpal can you provide the customized jar which was used to resolve the error.

wkwshihaoren commented 1 year ago

@SKinserLoanpal if you can provide the customized jar i will appreciate