ytsaurus / ytsaurus-spyt

YTsaurus SPYT provides an integration with Apache Spark
Apache License 2.0
12 stars 5 forks source link

df.write.sortedBy does not work with repartition #30

Closed paulpaul1076 closed 1 month ago

paulpaul1076 commented 1 month ago
 import spark.implicits._

    val df = List(
      ("2024-01-01", "red", "test2 русский текст", "testov", "1"),
      ("2024-01-01", "asf русский текст english text", "русский текст test1", "testov", "2"),
      ("2024-01-01", "blue русский текст", "test", "testov", "3")
    ).toDF("date", "color", "name", "lastname", "age")

    val sortCols = List("name", "color")
    val path = "//tmp/test_table"

    val repartitionedDF = df.repartition(1)

    repartitionedDF.write
      .sortedBy(sortCols:_*)
      .mode(SaveMode.Append)
      .yt(path)

This test code throws an exception:

tech.ytsaurus.core.common.YTsaurusError: 'Sort order violation: [0#"русский текст test1", 1#"asf русский текст english text"] > [0#"test", 1#"blue русский текст"]'; full error: {"code"=301;"message"="Sort order violation: [0#\"\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8\xd0\xb9 \xd1\x82\xd0\xb5\xd0\xba\xd1\x81\xd1\x82 test1\", 1#\"asf \xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8\xd0\xb9 \xd1\x82\xd0\xb5\xd0\xba\xd1\x81\xd1\x82 english text\"] > [0#\"test\", 1#\"blue \xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8\xd0\xb9 \xd1\x82\xd0\xb5\xd0\xba\xd1\x81\xd1\x82\"]";"attributes"={"comparator"=["ascending";"ascending";];"host"="localhost";"pid"=42;"tid"=13823990072292087251u;"thread"="Worker:0";"fid"=18446442718934160106u;"datetime"="2024-09-25T13:35:12.198927Z";"trace_id"="9b633ffb-6338c456-95250970-8c1e6a64";"span_id"=10744044366355496369u;};}

I cannot imagine using df.write, without prior repartitioning somewhere in the spark DAG, is this a bug?

paulpaul1076 commented 1 month ago

Looks like I need to do df.sort(cols).write.sortedBy(cols), because sortedBy doesn't sort anything. I wish this was mentioned in the docs.