databrickslabs / geoscan

Geospatial clustering at massive scale
Other
97 stars 19 forks source link

geoscan fit stuck on save at GeoscanPersonalizedModel.scala:139 #88

Open ivandiaz-tomtom opened 2 years ago

ivandiaz-tomtom commented 2 years ago

Hi,

I was wondering if you could help with a problem I am getting when running geoscan in a DBR 10.4 LTS cluster. After creating the dataframe with latitude, and longitude columns and trying to run a personalized geoscan, the cluster gets stuck on pending stage (in my case 62 tasks, description below). Are there any dependencies that can cause this? Unfortunately, there is no logging in the cluster than can help me track the root cause.

Thanks, Ivan

Code for creating the model

from geoscan import GeoscanPersonalized
import mlflow

with mlflow.start_run(run_name='GEOSCAN') as run:
    geoscan = GeoscanPersonalized() \
        .setLatitudeCol('lat') \
        .setLongitudeCol('lon') \
        .setPredictionCol('cluster') \
        .setGroupedCol("type") \
        .setEpsilon(20) \
        .setMinPts(3)

    mlflow.log_param('epsilon', 20)
    mlflow.log_param('minPts', 3)

    model = geoscan.fit(df)
    mlflow.spark.log_model(model, "geoscan")
    run_id = run.info.run_id

Task pending execution

org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
com.databricks.labs.gis.ml.GeoscanPersonalizedModel$GeoscanPersonalizedModelWriter.saveData(GeoscanPersonalizedModel.scala:139)
com.databricks.labs.gis.ml.GeoscanPersonalizedModel$GeoscanPersonalizedModelWriter.saveImpl(GeoscanPersonalizedModel.scala:129)
org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$5(Pipeline.scala:257)
org.apache.spark.ml.MLEvents.withSaveInstanceEvent(events.scala:175)
org.apache.spark.ml.MLEvents.withSaveInstanceEvent$(events.scala:170)
org.apache.spark.ml.util.Instrumentation.withSaveInstanceEvent(Instrumentation.scala:43)
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$4(Pipeline.scala:257)
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$4$adapted(Pipeline.scala:254)
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$1(Pipeline.scala:254)
org.apache.spark.ml.Pipeline$SharedReadWrite$.$anonfun$saveImpl$1$adapted(Pipeline.scala:247)
org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:284)
scala.util.Try$.apply(Try.scala:213)
org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:284)
org.apache.spark.ml.Pipeline$SharedReadWrite$.saveImpl(Pipeline.scala:247)
org.apache.spark.ml.PipelineModel$PipelineModelWriter.saveImpl(Pipeline.scala:346)
ivandiaz-tomtom commented 2 years ago

For reference, it works fine without using the personalized model.

with mlflow.start_run(run_name='GEOSCAN') as run:
    geoscan = Geoscan() \
        .setLatitudeCol('lat') \
        .setLongitudeCol('lon') \
        .setPredictionCol('cluster') \
        .setEpsilon(20) \
        .setMinPts(3)

    mlflow.log_param('epsilon', 20)
    mlflow.log_param('minPts', 3)

    model = geoscan.fit(df)
    mlflow.spark.log_model(model, "geoscan")
    run_id = run.info.run_id
simenojensen commented 1 year ago

I also have this problem

aamend commented 1 year ago

Personalized model runs clustering for each group in memory.

I suspect some of your groups (e.g. a user) may have too much data to be used in memory. You could run same grouping and get simple statistics to see if specific groups are over represented, possibly treating those as separate process