memsql / singlestore-spark-connector

A connector for SingleStore and Spark
Apache License 2.0
160 stars 54 forks source link

NullPointerException while getting next window partition #31

Closed Azuaron closed 4 years ago

Azuaron commented 7 years ago

Seen in both Spark 2.0.0 and Spark 2.1.0 Stack trace for 2.1.0:

java.lang.NullPointerException at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:341)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
    at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.memsql.spark.connector.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:52)
    at java.lang.Thread.run(Thread.java:745)

Using the memsql-spark-connector, Spark is unable to fetch the next partition because of a NullPointerException for this line in WindowExec: TaskContext.get().taskMemoryManager(),. I can output this full dataset by writing a CSV file to S3 or HDFS, so it seems like it must be something with the memsql-spark-connector that is causing that error.

Data input is a single large (208 GB) parquet file with the following Spark SQL query modifying the data to be output:

   SELECT lessonSessionId AS id,
                activityId as activity_id,
                lessonId AS activity_item_id,
                componentId AS activity_component_id,
                studentId AS student_id,
                studentAcademicYear AS student_academic_year_id,
                academicYearId AS academic_year,
                subjectId AS subject,
                to_date(eventStart) AS start_date_id,
                (unix_timestamp(eventStart) - unix_timestamp(to_date(eventStart))) AS start_time_id,
                to_date(eventEnd) AS end_date_id,
                (unix_timestamp(eventEnd) - unix_timestamp(to_date(eventEnd))) AS end_time_id,
                durationSecs AS duration_seconds,
                durationSecsRaw AS duration_seconds_raw,
                loginSessionId AS session_id,
                sittingType AS sitting_type
            FROM (
                SELECT *, ROW_NUMBER() OVER (PARTITION BY lessonSessionId ORDER BY eventEnd DESC) col
                FROM prod_tot_prod_tot
                WHERE eventType != 'LESSON_SKIP'
            ) x
            WHERE x.col = 1

And this code outputting the data to MemSQL:

val conf = new SaveToMemSQLConf(SaveMode.Ignore, CreateMode.Skip, Option.empty, 10000, CompressionType.GZip, true, Seq.empty, Seq.empty, false)
sqlDf.saveToMemSQL(TableIdentifier.apply(db, table), conf)

About half of the data is loaded into MemSQL, but some of the tasks persistently fail with the above NPE.

choochootrain commented 7 years ago

@Azuaron when you say I can output this full dataset by writing a CSV file to S3 or HDFS do you mean sqlDf or the untransformed parquet file?

the NPE is thrown when our code is iterating through the rows in each partition: https://github.com/memsql/memsql-spark-connector/blob/master/src/main/scala/com/memsql/spark/connector/LoadDataStrategy.scala#L52

so i'm curious if you can repro this with a stripped down version of saveToMemSQL:

sqlDf.foreachPartition(partition => {
  new Thread(new Runnable {
    override def run(): Unit = {
      for (row <- partition) {
        // do nothing here, just force the partition to be fully iterated over
      }
    }
  }).start()
})
Azuaron commented 7 years ago

@choochootrain I mean sqlDf:

sqlDf.write.format("csv").option("compression", "none").save(s3Location)

I was able to reproduce the bug with the stripped-down version. Similar (but not exactly the same) stack trace:

java.lang.NullPointerException
        at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:287)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:86)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextRow(WindowExec.scala:301)
        at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:361)
        at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
        at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at com.cainc.data.etl.lake.TestMartOutput$$anonfun$main$3$$anon$1.run(TestMartOutput.scala:42)
        at java.lang.Thread.run(Thread.java:745)

The line in UnsafeInMemorySorter that throws is marked with -> below:

@Override
    public void loadNext() {
      // This pointer points to a 4-byte record length, followed by the record's bytes
->    final long recordPointer = array.get(offset + position);
      currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);
      int uaoSize = UnsafeAlignedOffset.getUaoSize();
      baseObject = memoryManager.getPage(recordPointer);
      // Skip over record length
      baseOffset = memoryManager.getOffsetInPage(recordPointer) + uaoSize;
      recordLength = UnsafeAlignedOffset.getSize(baseObject, baseOffset - uaoSize);
      keyPrefix = array.get(offset + position + 1);
      position += 2;
    }

That array value is used for pointers and as a buffer. My bet is, Spark is not happy about running new Threads on the foreachPartition, and there are several objects missing some of their contexts. I'm not sure if that's a problem within Spark that needs to be addressed by them, or if the MemSQL connector needs to work without thread creation.

lucyyu commented 7 years ago

It seems like someone has encountered a very similar issue when running threads in foreachPartition() and filed a bug in the Spark JIRA (https://issues.apache.org/jira/browse/SPARK-19476)

I've added a comment describing your case as well.

lucyyu commented 7 years ago

Hi again! So I tried rewriting LoadDataStrategy to swap the work so that the main thread should be able to access the thread-local variables it expects. I was having some trouble reproducing your bug but if you'd be willing to try out my change on your query, it is in the branch "thread".

https://github.com/memsql/memsql-spark-connector/tree/thread

Also as a workaround you can try enforcing the connector to use the InsertDataStrategy to save rows to MemSQL instead, which does not create a new thread. The easiest way would probably be to modify the source and set useInsert to true here: https://github.com/memsql/memsql-spark-connector/blob/master/src/main/scala/com/memsql/spark/connector/DataFrameFunctions.scala#L36 . We will consider making saveToMemSQL function take a parameter to use InsertDataStrategy.

carlsverre commented 4 years ago

I believe this has been resolved. If not in master, then probably should be working in the new 3.0.0-beta version (coming out soon) https://github.com/memsql/memsql-spark-connector/tree/3.0.0-beta