charles-river-analytics / figaro

Figaro Programming Language and Core Libraries
Other
757 stars 153 forks source link

Creating and Inferencing Models in Multiple Akka Workers (MultiThreading) #738

Open mvbaffa opened 6 years ago

mvbaffa commented 6 years ago

Hi,

I am new to Figaro and I would like to create a model that will be used in more than one actor. In fact I have created a router with workers. They execute in different threads as you know.

Each worker actor creates new instance of a class that creates the model, all variables, then make evidences and Inferences with VariableElimination. When I create only one worker, that is there is no multithreading, it works perfectly. Bu with more than one I got this error: key not found: Select(0.01 -> 'visit, 0.99 -> 'novisit)

I have implemented the well known ChestClinic model like this:

 val universe = Universe.createNew()

  val worldTravel = Select(0.01 -> 'visit, 0.99 -> 'novisit) (s"worldTravel$msgIndex", universe)
  val smoking = Select(0.5 -> 'smoker, 0.5 -> 'nosmoker) (s"smoking$msgIndex", universe)

  val tuberculosis = CPD(worldTravel,
    'visit -> Select(0.05 -> 'present, 0.95 -> 'absent),
    'novisit -> Select(0.01 -> 'present, 0.99 -> 'absent)) (s"tuberculosis$msgIndex", universe)

  val lungCancer = CPD(smoking,
    'smoker -> Select(0.1 -> 'present, 0.9 -> 'absent),
    'nosmoker -> Select(0.01 -> 'present, 0.99 -> 'absent)) (s"lungCancer$msgIndex", universe)

  val bronChitis = CPD(smoking,
    'smoker -> Select(0.6 -> 'present, 0.4 -> 'absent),
    'nosmoker -> Select(0.3 -> 'present, 0.7 -> 'absent)) (s"bronChitis$msgIndex", universe)

  def tbOrCFcn(tuberculosis: Symbol, lungCancer: Symbol): Boolean = {
    (tuberculosis, lungCancer) match {
      case ('present, 'present) => true
      case ('present, 'absent) => true
      case ('absent, 'present)=> true
      case ('absent, 'absent) => false
    }
  }

  val tuberculosisOrCancer = Apply(tuberculosis, lungCancer, tbOrCFcn) (s"tuberculosisOrCancer$msgIndex", universe)

  val xRayPrior = Chain(tuberculosisOrCancer, (tb: Boolean) => if(tb) Constant(0.982) else Constant(0.02)) (s"xRayPrior$msgIndex", universe)
  val xRay = CPD(tuberculosisOrCancer,
    true -> Select(0.98 -> 'abnormal, 0.02 -> 'normal),
    false -> Select(0.05 -> 'abnormal, 0.95 -> 'normal)) (s"xRay$msgIndex", universe)

  val dyspnea = CPD(tuberculosisOrCancer, bronChitis,
    (true, 'present) -> Select(0.9 -> 'present, 0.1 -> 'absent),
    (true, 'absent) -> Select(0.7 -> 'present, 0.3 -> 'absent),
    (false, 'present) -> Select(0.8 -> 'present, 0.2 -> 'absent),
    (false, 'absent) -> Select(0.1 -> 'present, 0.9 -> 'absent)) (s"dyspnea$msgIndex", universe)

As you can see I have create a new Universe and created the variables in it. The msgIndex is an Int that indexes each worker actor created. Before that, using the default universe, it never worked but after that sometimes I can create the model, but the error appears in some executions.

The VariableElimination comes just after the model creation:

val tbPriorAlg = VariableElimination(tuberculosisOrCancer) (universe)
  tbPriorAlg.start()
  val tbPrior = tbPriorAlg.probability(tuberculosisOrCancer, true)
  if(isDebug) println("Prior probability the Tuberculosis Or Cancer on = " + tbPrior.toString)

But even if I remove the VariableElimination, the error occurs.

Can you help me ?

Thanks in advance

mvbaffa commented 6 years ago

Did anyone had the same problem ?

Thanks