memsql / singlestore-spark-connector

A connector for SingleStore and Spark
Apache License 2.0
160 stars 54 forks source link

saveToMemSQL Error when run sql with GROUP BY #17

Closed giaosudau closed 4 years ago

giaosudau commented 8 years ago

It's ok with

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId
      | FROM $tableName
    """.stripMargin

But when I add more field to calculate and group by.

val sql =
    s"""
      | SELECT gender AS genderId, -1 AS ageId,
      | CAST(0 AS LONG) AS impression, CAST(0 AS LONG) AS trueImpression, COUNT(*) AS click
      | FROM $tableName
      | GROUP BY gender
    """.stripMargin
  val result = sqlContext.sql(sql)
  result.saveToMemSQL("dev_output", "aggregate")

It throws error

INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
Exception in thread "main" com.memsql.spark.SaveToMemSQLException: SaveToMemSQLException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 172 in stage 1.0 failed 1 times, most recent failure: Lost task 172.0 in stage 1.0 (TID 174, localhost): org.apache.spark.SparkException: Internal error: release called on 33554432 bytes but task only has 0
    at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
    at org.apache.spark.unsafe.map.BytesToBytesMap.free(BytesToBytesMap.java:708)
    at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.free(UnsafeFixedWidthAggregationMap.java:234)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:678)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:76)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.memsql.LoadDataStrategy$$anon$2.run(LoadDataStrategy.scala:53)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:85)
    at org.apache.spark.sql.memsql.SparkImplicits$DataFrameFunctions.saveToMemSQL(SparkImplicits.scala:40)
    at job.WriteToMemSQL$delayedInit$body.apply(WriteToMemSQL.scala:105)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at job.WriteToMemSQL$.main(WriteToMemSQL.scala:17)
    at job.WriteToMemSQL.main(WriteToMemSQL.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

I am using spark 1.5.2 and memsql 1.3.2

choochootrain commented 8 years ago

is sqlContext a MemSQLContext here or a plain SQLContext? also what is the reported schema of result and the schema for the table dev_output.aggregate (if it already exists)?

giaosudau commented 8 years ago

`sqlContexthere is a plainSQLContext``. I import

import org.apache.spark.sql.memsql.SparkImplicits._

to saveToMemSQL

schema of table aggregate is

CREATE TABLE `aggregate` (
  `date` bigint(20) DEFAULT NULL,
  `networkId` bigint(20) DEFAULT NULL,
  `creativeId` bigint(20) DEFAULT NULL,
  `sectionId` bigint(20) DEFAULT NULL,
  `zoneId` bigint(20) DEFAULT NULL,
  `formatId` int(11) DEFAULT NULL,
  `templateId` bigint(20) DEFAULT NULL,
  `advertiserId` bigint(20) DEFAULT NULL,
  `campaignId` bigint(20) DEFAULT NULL,
  `paymentModel` int(11) DEFAULT NULL,
  `adDefault` int(11) NOT NULL DEFAULT '0',
  `websiteId` bigint(20) NOT NULL DEFAULT '0',
  `placementId` int(11) NOT NULL DEFAULT '0',
  `topicId` int(11) NOT NULL DEFAULT '0',
  `interestId` int(11) NOT NULL DEFAULT '0',
  `inMarket` int(11) NOT NULL DEFAULT '0',
  `locationId` bigint(20) DEFAULT NULL,
  `osId` int(11) DEFAULT NULL,
  `browserId` int(11) DEFAULT NULL,
  `deviceTypeId` int(11) DEFAULT NULL,
  `deviceModelId` int(11) DEFAULT NULL,
  `genderId` int(11) DEFAULT NULL,
  `ageId` int(11) NOT NULL DEFAULT '0',
  `impression` bigint(20) NOT NULL DEFAULT '0',
  `trueImpression` bigint(20) NOT NULL DEFAULT '0',
  `click` bigint(20) NOT NULL DEFAULT '0',
  `revenue` double DEFAULT NULL,
  `proceeds` double DEFAULT NULL,
  `spent` double DEFAULT NULL,
  `memsql_insert_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  KEY `memsql_insert_time` (`memsql_insert_time`)
  /*!90618 , SHARD KEY () */ 
)

It works if I save to parquet file then read it and saveToMemSQL

choochootrain commented 8 years ago

what happens if you call result.collect() instead of result.saveToMemSQL? the stack is coming from here which is using spark's partition iterator

giaosudau commented 8 years ago

It's ok if I call collect or save as a parquet file. The error I think because the way you alloc the memory for task is not correct. And the error occur when task has not memory to release. You can try by yourself a simple one using group by SQL.

choochootrain commented 8 years ago

memory allocation is handled by spark, saveToMemSQL doesn't change that.

saveToMemSQL retrieves rows from the dataframe at the partition level - the implementation can be distilled into something like this:

result.foreachPartition(part => {
  for (row <- partition) {
    println(row) // normally this is inserted into memsql
  }
})

i'll try to repro this but in the meantime could you try the above as well?

shashankgowdal commented 8 years ago

Any update on this issue? @choochootrain We are hitting into this issue and we are using SQLContext and not MemSQLContext.

@giaosudau How did you overcome this issue?

giaosudau commented 8 years ago

@shashankgowdal Like I said it's bug. You should prevent use group by clause or you just save to another storage before store to MemSQL.

choochootrain commented 8 years ago

@giaosudau did you try the above as well? the stack is entirely in Spark land so I'm curious if it manifests without any MemSQL code.

rendybjunior commented 7 years ago

Hi all, I am using it differently but end up with the same issue grouped_df = df.groupBy(['city', 'month']).agg({'*': 'count', 'cost': 'sum'})

Is there any solution for this yet?

choochootrain commented 7 years ago

@rendybjunior i'm curious what happens when you do

grouped_df.foreachPartition(part => {
  for (row <- partition) {
    println(row) // normally this is inserted into memsql
  }
})

this is the same dataframe operation that saveToMemSQL performs, but printing rows instead of inserting them with JDBC.

carlsverre commented 4 years ago

Please test with the new beta version of our connector. https://github.com/memsql/memsql-spark-connector/tree/3.0.0-beta