apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.95k stars 695 forks source link

Question handling skewed data during join #249

Closed georgThesis closed 1 year ago

georgThesis commented 6 years ago

Data skewness is very large for the spatial join from a couple of kb to MB is there something I can do to get more even partitions? Rtre for indexing and kdBtree for partitioning are used image

jiayuasu commented 6 years ago

@georgThesis Assume you have two datasets A and B, if A is large and super skewed, you should use first do spatial partitioning on A, then apply the grids from A to B.

In other words, the leading dataset/column in spatial join should be the skewed one.

This applies to both RDD and SQL API.

In http://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Parameter/ spatial partition side means the leading side.

jhamiltonpro commented 5 years ago

Can you be more specific on which parameters to specify in this link: http://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Parameter/

I am also experiencing some kind of skewness where (likely) during a join something like 498/500 tasks are complete pretty quickly and the last 2 tasks take forever to complete (sometimes failing due to memory issues). I'm using geospark sql and all default geospark sql parameters.

Here's the list of tables I'm joining and the geometries that are within each table (I'm using a very big cluster and each table is partitioned to about 4000 partitions before/after joining):

Big Table A (simple points geometry) (Base Table)

Big Table B (simple polygons geometry (4 points each)) Small Table C (complex polygons geometry (ranging from 20 points each to 100+ points each)) Small Table D (polygons, 20 pts each) Small Table E (polygons, < 10 pts each)

Here is the gist of the join plan (i join them all together in a loop, with BigTableA being the 'base' table and I'm basically filtering points geometries that are contained in polygon geometries):

  1. TempResult = BigTableA join SmallTableD where ST_CONTAINS(points in polygons)
  2. TempResult = TempResult join SmallTableC where ST_CONTAINS(points in polygons)
  3. TempResult = TempResult join BigTableB where ST_CONTAINS(points in polygons)
  4. FinalResult = TempResult join SmallTableE where ST_CONTAINS(points in polygons)

I think in step 2, I experience significant skewness where I'm joining a big table (points) with a small table (both simple polygons and really complex polygons). And just in general overall, I notice a scenario where the last 1 or 2 tasks out of many take hours to complete in comparison to the previous tasks, where the meat of the join seems to be assigned to a single executor.

I am using geopsark version 1.1.3 and again I have all default geospark sql settings. Please let me know changing which parameters would provide the most benefit for me.

Thanks!

georgThesis commented 5 years ago

Switch spatial partitioning to kdbtree and increase the parallelism.

This both might help you to solve the issue.

jhamiltonpro notifications@github.com schrieb am Fr. 15. März 2019 um 22:16:

Can you be more specific on which parameters to specify in this link: http://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Parameter/

I am also experiencing some kind of skewness where (likely) during a join something like 498/500 tasks are complete pretty quickly and the last 2 tasks take forever to complete (sometimes failing due to memory issues). I'm using geospark sql and all default geospark sql parameters.

Here's the list of tables I'm joining and the geometries that are within each table (I'm using a very big cluster and each table is partitioned to about 4000 partitions before/after joining):

Big Table A (simple points geometry) (Base Table)

Big Table B (simple polygons geometry (4 points each)) Small Table C (complex polygons geometry (ranging from 20 points each to 100+ points each)) Small Table D (polygons, 20 pts each) Small Table E (polygons, < 10 pts each)

Here is the gist of the join plan (i join them all together in a loop, with BigTableA being the 'base' table and I'm basically filtering points geometries that are contained in polygon geometries):

  1. TempResult = BigTableA join SmallTableD where ST_CONTAINS(points in polygons)
  2. TempResult = TempResult join SmallTableC where ST_CONTAINS(points in polygons)
  3. TempResult = TempResult join BigTableB where ST_CONTAINS(points in polygons)
  4. FinalResult = TempResult join SmallTableE where ST_CONTAINS(points in polygons)

I think in step 2, I experience significant skewness where I'm joining a big table (points) with a small table (both simple polygons and really complex polygons). And just in general overall, I notice a scenario where the last 1 or 2 tasks out of many take hours to complete in comparison to the previous tasks, where the meat of the join seems to be assigned to a single executor.

I am using geopsark version 1.1.3 and again I have all default geospark sql settings. Please let me know changing which parameters would provide the most benefit for me.

Thanks!

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/DataSystemsLab/GeoSpark/issues/249#issuecomment-473443663, or mute the thread https://github.com/notifications/unsubscribe-auth/AT0yBvurW6JNtovhDYcLBHSnQBR-AYrFks5vXA2SgaJpZM4VF3my .

jhamiltonpro commented 5 years ago

Using quadtree and kdbtree definitely helped - thanks for the suggestions!

By increasing the parallelism, do you mean setting spark.default.parallelism, repartitioning the dataframes to have even more partitions or something else? I currently already repartition to around 4000 after each join and it seems like this should do the job in spreading the work out evenly, but even with quadtree and kdbtree specified, there's still significant skew in execution time.

pedromorfeu commented 4 years ago

@jhamiltonpro, have you managed to improve your join query? How?