typelevel / frameless

Expressive types for Spark.
Apache License 2.0
869 stars 139 forks source link

Columns of same type mixed up during grouping, select and as #411

Open mfelsche opened 4 years ago

mfelsche commented 4 years ago

I took a TypedDataset of case class A, grouped it, mixing the order of two columns of the same type, resulting in a tupled dataset. I had to do this way, don't ask. To get stuff right again, I selected the columns in the right order again, and finally used .as[A] again to get me back a nice TypedDataset of my type A.

Expected Behaviour: Everything just as it has been, right columns ending up in the right place.

Actual behaviour: The mixed up columns weren't put in the right order by the select I issued at the end.

I suspect the quirk is somewhere within .as[A] but i cannot pinpoint it tbh.

Here a small reproducer:

  import frameless._
  import frameless.syntax._
  import frameless.functions.aggregate.{first, min}

  // create spark session ...

  implicit val sparkSession: SparkSession = session
  case class ConfusingColumns(name: String, company: String, created: Long)
  val data = ConfusingColumns("Joe", "snakeoil Inc.", 123L) ::
    ConfusingColumns("Barb", "ACME", 42L) ::
    ConfusingColumns("Joe", "snakeoil Inc.", 0L) :: Nil
  val ds: TypedDataset[ConfusingColumns] = TypedDataset.create(data)
  val grouped = ds
    .groupBy(
      ds('company)
    )
    .agg(
      first(ds('name)),
      min(ds('created))
    )
  val confused = grouped
    .select(
      grouped('_2),
      grouped('_1),
      grouped('_3)
    )
    .as[ConfusingColumns]
  confused.dataset.show()

Output (compare the case classes in data above):

+-------------+-------+-------+
|         name|company|created|
+-------------+-------+-------+
|snakeoil Inc.|    Joe|      0|
|         ACME|   Barb|     42|
+-------------+-------+-------+
mfelsche commented 4 years ago

Fiddling around further with the example, it seems the select doesn't have any effect. When collecting results before or after the select, the result remains the same:

grouped.collect()
// WrappedArray(("snakeoil Inc.", "Foo", 0L), ("ACME", "Bar", 42L))

grouped.select(
      grouped('_2),
      grouped('_1),
      grouped('_3)
    ).collect()
// WrappedArray(("snakeoil Inc.", "Foo", 0L), ("ACME", "Bar", 42L))
dsabanin commented 3 years ago

@mfelsche did you find out what was causing it by any chance?

AlexisBRENON commented 2 years ago

Hi. I reproduce a similar bug with an even simpler case. Just load partitionned data with the partitioning column being the fisrt case class field. The column is appended at the end of the schema, and collecting the data results in field inverted. https://github.com/typelevel/frameless/compare/master...AlexisBRENON:case_class_support#diff-dd83f3b1d1a249804b5620473177ce6034efbc5f36b45a9b1ef01283cafd50f9R540

Do you think that this can be related ?

oroundtree commented 1 year ago

I've been seeing this issue as well when using scalapb-sparksql and a flow that uses encoders to create Datasets of both protobuf-derived types and normal scala case classes.

When I define the following case class:

case class CustomerClientServiceDailyCounts(
                                                 customer: String,
                                                 client: String,
                                                 service: String,
                                                 count: Long,
                                                 service_count: Long,
                                                 date: String
                                               )

And then cast a dataframe with identical column names and types and map using a function which takes the above type as input

dailyTransactionsTbl
  .as[CustomerClientServiceDailyCounts]
  .map(customerClientServiceFunction)

It will fail in that customerClientServiceFunction with the following error: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import

I now realize that this is happening because one of the long columns is being shuffled with one of the string columns and there's an attempt to cast it to a string when it's being used in the function.

I believe this is related, it just seems that sometimes even columns with different types can be shuffled too which leads to runtime errors