opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
14 stars 23 forks source link

[BUG] Rare with group by cause job abort due to result size too large #611

Closed seankao-az closed 1 month ago

seankao-az commented 1 month ago

What is the bug? Running source=myglue_test.default.http_logs | rare request by clientip gets error:

{
  "status": "FAILED",
  "error": "{\"Message\":\"Spark exception. Cause: Job aborted due to stage failure: Total size of serialized results of 118 tasks (1032.9 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)\"}"
}

However, an equivalent SQL query: select count(*) as cnt, request, clientip from myglue_test.default.http_logs group by request, clientip order by cnt asc limit 10 could get the result correctly:

SQL query result ``` { "status": "SUCCESS", "schema": [ { "name": "cnt", "type": "long" }, { "name": "request", "type": "string" }, { "name": "clientip", "type": "string" } ], "datarows": [ [ 1, "GET /images/102321.gif HTTP/1.0", "120.125.16.0" ], [ 1, "GET /images/102327.gif HTTP/1.0", "183.200.6.0" ], [ 1, "GET /images/102321.gif HTTP/1.0", "89.57.14.0" ], [ 1, "GET /images/102321.gif HTTP/1.0", "147.116.16.0" ], [ 1, "GET /images/102321.gif HTTP/1.1", "224.58.14.0" ], [ 1, "GET /images/102321.gif HTTP/1.0", "178.72.4.0" ], [ 1, "GET /english/images/comp_bu_stage1n.gif HTTP/1.0", "195.128.16.0" ], [ 1, "GET /english/images/comp_bu_stage1n.gif HTTP/1.0", "26.52.13.0" ], [ 1, "GET /images/102321.gif HTTP/1.1", "77.76.14.0" ], [ 1, "GET /english/images/team_bu_roster_on.gif HTTP/1.1", "142.58.14.0" ] ], "total": 10, "size": 10 } ```

The LIMIT 10 clause might cause a difference in SQL query, but I add it because rare command defaults to size 10 as well

Physical plan for PPL query:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(4) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#32]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3843/0x00007f070d20d1d0@4851585e, obj#31: java.lang.String
      +- DeserializeToObject createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, count(request)#8L, true, false, true), request#12.toString, clientip#11.toString, StructField(count(request),LongType,false), StructField(request,StringType,true), StructField(clientip,StringType,true)), obj#30: org.apache.spark.sql.Row
         +- *(3) Sort [request#12 DESC NULLS LAST], true, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange rangepartitioning(request#12 DESC NULLS LAST, 1000), ENSURE_REQUIREMENTS, [plan_id=98]
                     +- *(2) HashAggregate(keys=[request#12, clientip#11], functions=[count(request#12)], output=[count(request)#8L, request#12, clientip#11], schema specialized)
                        +- AQEShuffleRead coalesced
                           +- ShuffleQueryStage 0
                              +- Exchange hashpartitioning(request#12, clientip#11, 1000), ENSURE_REQUIREMENTS, [plan_id=52]
                                 +- *(1) HashAggregate(keys=[request#12, clientip#11], functions=[partial_count(request#12)], output=[request#12, clientip#11, count#34L], schema specialized)
                                    +- *(1) Project [clientip#11, request#12]
                                       +- FileScan json spark_catalog.default.http_logs[clientip#11,request#12,year#15,month#16,day#17] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3://..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

Physical plan for SQL query:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#31]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3865/0x00007fa68d23a778@509772ad, obj#30: java.lang.String
      +- DeserializeToObject createexternalrow(staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cnt#8L, true, false, true), request#12.toString, clientip#11.toString, StructField(cnt,LongType,false), StructField(request,StringType,true), StructField(clientip,StringType,true)), obj#29: org.apache.spark.sql.Row
         +- TakeOrderedAndProject(limit=10, orderBy=[cnt#8L ASC NULLS FIRST], output=[cnt#8L,request#12,clientip#11])
            +- *(2) HashAggregate(keys=[request#12, clientip#11], functions=[count(1)], output=[cnt#8L, request#12, clientip#11], schema specialized)
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 0
                     +- Exchange hashpartitioning(request#12, clientip#11, 1000), ENSURE_REQUIREMENTS, [plan_id=48]
                        +- *(1) HashAggregate(keys=[request#12, clientip#11], functions=[partial_count(1)], output=[request#12, clientip#11, count#33L], schema specialized)
                           +- *(1) Project [clientip#11, request#12]
                              +- FileScan json spark_catalog.default.http_logs[clientip#11,request#12,year#15,month#16,day#17] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3:..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

What is your host/environment?

penghuo commented 1 month ago

LIMIT 10 is not the same as Rare. To achieve the desired result, we should use a window function to limit the output to 10 records per group.

SELECT 
    request, 
    clientip, 
    cnt
FROM (
    SELECT 
        request, 
        clientip, 
        COUNT(*) AS cnt, 
        ROW_NUMBER() OVER (PARTITION BY request, clientip ORDER BY cnt ASC) AS rn
    FROM 
        myglue_test.default.http_logs
    GROUP BY 
        request, clientip
) AS RankedLogs
WHERE 
    rn <= 10
ORDER BY 
    cnt ASC;
seankao-az commented 1 month ago
Fail to analyze query. Cause: [UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_WINDOW] The feature is not supported: Referencing a lateral column alias `cnt` in window expression "row_number() OVER (PARTITION BY request, clientip ORDER BY lateralAliasReference(cnt) ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"

modifying the query to:

SELECT 
    request, 
    clientip, 
    cnt
FROM (
    SELECT 
        request, 
        clientip, 
        COUNT(*) AS cnt, 
        ROW_NUMBER() OVER (PARTITION BY request, clientip ORDER BY COUNT(*) ASC) AS rn
    FROM 
        myglue_test.default.http_logs
    GROUP BY 
        request, clientip
) AS RankedLogs
WHERE 
    rn <= 10
ORDER BY 
    cnt ASC;

results in:

{
    "status": "FAILED",
    "error": "{\"Message\":\"Spark exception. Cause: Job aborted due to stage failure: Total size of serialized results of 12 tasks (1905.8 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)\"}"
}

The physical plan being:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(5) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false, true) AS value#35]
   +- MapPartitions org.apache.spark.sql.Dataset$$Lambda$3910/0x00007f4501250f10@10b46c72, obj#34: java.lang.String
      +- DeserializeToObject createexternalrow(request#15.toString, clientip#14.toString, staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cnt#8L, true, false, true), StructField(request,StringType,true), StructField(clientip,StringType,true), StructField(cnt,LongType,false)), obj#33: org.apache.spark.sql.Row
         +- *(4) Sort [cnt#8L ASC NULLS FIRST], true, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 1
                  +- Exchange rangepartitioning(cnt#8L ASC NULLS FIRST, 1000), ENSURE_REQUIREMENTS, [plan_id=205]
                     +- *(3) Project [request#15, clientip#14, cnt#8L]
                        +- *(3) Filter (rn#9 <= 10)
                           +- Window [row_number() windowspecdefinition(request#15, clientip#14, _w0#21L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9], [request#15, clientip#14], [_w0#21L ASC NULLS FIRST]
                              +- WindowGroupLimit [request#15, clientip#14], [_w0#21L ASC NULLS FIRST], row_number(), 10, Final
                                 +- *(2) Sort [request#15 ASC NULLS FIRST, clientip#14 ASC NULLS FIRST, _w0#21L ASC NULLS FIRST], false, 0
                                    +- *(2) HashAggregate(keys=[request#15, clientip#14], functions=[count(1)], output=[request#15, clientip#14, cnt#8L, _w0#21L], schema specialized)
                                       +- AQEShuffleRead coalesced
                                          +- ShuffleQueryStage 0
                                             +- Exchange hashpartitioning(request#15, clientip#14, 1000), ENSURE_REQUIREMENTS, [plan_id=97]
                                                +- *(1) HashAggregate(keys=[request#15, clientip#14], functions=[partial_count(1)], output=[request#15, clientip#14, count#37L], schema specialized)
                                                   +- *(1) Project [clientip#14, request#15]
                                                      +- FileScan json spark_catalog.default.http_logs[clientip#14,request#15,year#18,month#19,day#20] Batched: false, DataFilters: [], Format: JSON, Location: CatalogFileIndex(1 paths)[s3:..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<clientip:string,request:string>

Now the plan looks way different with the window function.

seankao-az commented 1 month ago

But regardless, same failure can happen to SQL as well. Not a PPL rare command issue. And it can be solved by scaling up driver node or changing spark.driver.maxResultSize as error message suggests