Open monkey0head opened 3 years ago
find information about problems with Spark (groupBy, joins, how to replace it without)) in internet and how to solve it. Solve it in base_rec
run base_rec code by hand, calling explain along the way to see if shuffle of cross joins arise
In base rec the only suspicious place is filtering seen items.
First we leave max_seen
items per user, then seen+k
and then do anti join with log. Seems we could just do anti join and call get_top_k
function. Here is the execution plan for recs
== Physical Plan ==
*(4) Project [coalesce(user_idx#1058, 0) AS user_idx#1080, coalesce(item_idx#1059, 0) AS item_idx#1081, coalesce(nanvl(relevance#1060, null), 0.0) AS relevance#1082]
+- *(4) Filter (cast(coalesce(temp_rank#1065, 0) as bigint) <= (coalesce(seen_count#908L, 0) + 10))
+- *(4) BroadcastHashJoin [user_idx#1058], [user_idx#544], LeftOuter, BuildRight
:- *(4) Filter (isnotnull(temp_rank#1065) AND (temp_rank#1065 <= 2708))
: +- Window [row_number() windowspecdefinition(user_idx#1058, relevance#1060 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS temp_rank#1065], [user_idx#1058], [relevance#1060 DESC NULLS LAST]
: +- *(2) Sort [user_idx#1058 ASC NULLS FIRST, relevance#1060 DESC NULLS LAST], false, 0
: +- Exchange hashpartitioning(user_idx#1058, 24), true, [id=#1360]
: +- *(1) ColumnarToRow
: +- FileScan parquet [user_idx#1058,item_idx#1059,relevance#1060] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/darel/python/sponge-bob-magic/pops], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_idx:int,item_idx:int,relevance:double>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#1371]
+- *(3) Filter isnotnull(user_idx#544)
+- *(3) ColumnarToRow
+- InMemoryTableScan [user_idx#544, seen_count#908L], [isnotnull(user_idx#544)]
+- InMemoryRelation [user_idx#544, seen_count#908L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(2) HashAggregate(keys=[user_idx#544], functions=[count(item_idx#606)])
+- Exchange hashpartitioning(user_idx#544, 24), true, [id=#583]
+- *(1) HashAggregate(keys=[user_idx#544], functions=[partial_count(item_idx#606)])
+- *(1) Project [user_idx#544, cast(item_idx#594 as int) AS item_idx#606]
+- *(1) Project [cast(user_idx#532 as int) AS user_idx#544, UDF(cast(item_id#477 as string)) AS item_idx#594]
+- *(1) Project [item_id#477, UDF(cast(user_id#476 as string)) AS user_idx#532]
+- *(1) Scan ExistingRDD arrow[user_id#476,item_id#477,rating#478,timestamp#479,relevance#480]
add please spark.sql.autoBroadcastJoinThreshold =-1 to conf and check build plan again.
== Physical Plan ==
*(9) Project [coalesce(user_idx#1133, 0) AS user_idx#1853, coalesce(item_idx#1134, 0) AS item_idx#1854, coalesce(nanvl(relevance#1135, null), 0.0) AS relevance#1855]
+- SortMergeJoin [coalesce(user_idx#1133, 0), coalesce(item_idx#1134, 0)], [user#1872, item#1866], LeftAnti
:- *(6) Sort [coalesce(user_idx#1133, 0) ASC NULLS FIRST, coalesce(item_idx#1134, 0) ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(user_idx#1133, 0), coalesce(item_idx#1134, 0), 24), true, [id=#1868]
: +- *(5) Project [user_idx#1133, item_idx#1134, relevance#1135]
: +- *(5) Filter (cast(coalesce(temp_rank#1838, 0) as bigint) <= (coalesce(seen_count#1787L, 0) + 10))
: +- *(5) BroadcastHashJoin [user_idx#1133], [user_idx#1479], LeftOuter, BuildRight
: :- *(5) Filter (isnotnull(temp_rank#1838) AND (temp_rank#1838 <= 2708))
: : +- Window [row_number() windowspecdefinition(user_idx#1133, relevance#1135 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS temp_rank#1838], [user_idx#1133], [relevance#1135 DESC NULLS LAST]
: : +- *(2) Sort [user_idx#1133 ASC NULLS FIRST, relevance#1135 DESC NULLS LAST], false, 0
: : +- Exchange hashpartitioning(user_idx#1133, 24), true, [id=#1850]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet [user_idx#1133,item_idx#1134,relevance#1135] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/darel/python/sponge-bob-magic/pops], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_idx:int,item_idx:int,relevance:double>
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#1862]
: +- *(4) HashAggregate(keys=[user_idx#1479], functions=[count(item_idx#1541)])
: +- Exchange hashpartitioning(user_idx#1479, 24), true, [id=#1858]
: +- *(3) HashAggregate(keys=[user_idx#1479], functions=[partial_count(item_idx#1541)])
: +- *(3) Project [user_idx#1479, cast(item_idx#1529 as int) AS item_idx#1541]
: +- *(3) Filter isnotnull(user_idx#1479)
: +- *(3) Project [cast(user_idx#1467 as int) AS user_idx#1479, UDF(cast(item_id#1412 as string)) AS item_idx#1529]
: +- *(3) Project [item_id#1412, UDF(cast(user_id#1411 as string)) AS user_idx#1467]
: +- *(3) Scan ExistingRDD arrow[user_id#1411,item_id#1412,rating#1413,timestamp#1414,relevance#1415]
+- *(8) Sort [user#1872 ASC NULLS FIRST, item#1866 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user#1872, item#1866, 24), true, [id=#1873]
+- *(7) Project [user_idx#1479 AS user#1872, cast(item_idx#1529 as int) AS item#1866]
+- *(7) Filter (isnotnull(user_idx#1479) AND isnotnull(cast(item_idx#1529 as int)))
+- *(7) Project [cast(user_idx#1467 as int) AS user_idx#1479, UDF(cast(item_id#1412 as string)) AS item_idx#1529]
+- *(7) Project [item_id#1412, UDF(cast(user_id#1411 as string)) AS user_idx#1467]
+- *(7) Scan ExistingRDD arrow[user_id#1411,item_id#1412,rating#1413,timestamp#1414,relevance#1415]
Leaving max_seen items before join was an optimisation which I did. It speeds up shuffle significantly, because without filtering all data (num users in predict * num items in dataset) shuffles for no reason.
Any of leaving max_seen or joining with count_by_user (only with broadcast join) significantly speeds up execution. I used both and it may be redundant.
May be the better way is leave max_seen items per user and then perform anti_join. You can see results on screenshot and details in the notebook.
predict with filtering seen:
check base_rec and models code for redundant joins and groupBy