derrickoswald / CIMSpark

Spark access to Common Information Model (CIM) files
MIT License
15 stars 1 forks source link

Elements not found #5

Closed derrickoswald closed 7 years ago

derrickoswald commented 7 years ago

In some cases, due perhaps to the distributed nature of the system, the Elements RDD is not found as a named RDD by the GridLAB-D application:

setup : 4.212088595 seconds
root
 |-- sup: element (nullable = true)

== Physical Plan ==
*Scan ch.ninecode.cim.CIMRelation@24755a46 [sup#0]
read : 802.132211919 seconds
Exception in thread "main" java.lang.NullPointerException
        at ch.ninecode.gl.GridLABD.prepare(GridLABD.scala:782)
        at ch.ninecode.gl.Main$.main(Main.scala:237)
        at ch.ninecode.gl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Simply repeating the same code often passes this same point without error.

The offending client code looks like this:

        // get all elements
        val elements = get ("Elements").asInstanceOf[RDD[Element]]

        // join with WireInfo to get ratedCurrent (only for ACLineSegments)
        val cableMaxCurrent = getCableMaxCurrent()
782     val joined_elements = elements.keyBy(_.id).leftOuterJoin(cableMaxCurrent).map(e =>
          {
            val ele = e._2._1
            val wire = e._2._2
            val wireinfo = wire match
            {
              case Some(maxCurrent) => maxCurrent
              case None => Double.PositiveInfinity
            }
            (ele.id, (ele, wireinfo))
          })

So it seems that the replacement of the Elements RDD is not happening atomically in the CIMNetworkTopologyProcessor:

        // swap the old Elements RDD for the new one
        old_elements.name = null
        new_elements.name = "Elements"
        val bogus = new_elements.count + 1L // needed for some reason
        new_elements.persist (storage)
        session.sparkContext.getCheckpointDir match
        {
            case Some (dir) => new_elements.checkpoint ()
            case None =>
        }

Somewhere between the name being set null and the new name being assigned, another process attempts to look up the name.

Not sure how to fix this. Note the kludgey call to count() that should maybe precede the name reassignment.

derrickoswald commented 7 years ago

This issue has not re-occurred since the attempted cure was integrated. Considering it as fixed.