Exception in thread "main" java.lang.NoSuchMethodError: breeze.linalg.DenseVector$.tabulate$mDc$sp(ILscala/Function1;Lscala/reflect/ClassTag;)Lbreeze/linalg/DenseVector;
at viterbiAlgorithm.User$$anonfun$eval$2.apply(viterbiAlgo.scala:84)
at viterbiAlgorithm.User$$anonfun$eval$2.apply(viterbiAlgo.scala:80)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at viterbiAlgorithm.User.eval(viterbiAlgo.scala:80)
at viterbiAlgorithm.viterbiAlgo$.main(viterbiAlgo.scala:28)
at viterbiAlgorithm.viterbiAlgo.main(viterbiAlgo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I can successfully run the code locally though with sbt run, so I don't this there is anything wrong with my code. Also, the compile and run-time version of scala and spark are the same.
The code for viterbiAlgo.scala is:
package viterbiAlgorithm
import breeze.linalg._
// import org.apache.spark.sql.SparkSession
object viterbiAlgo {
def main(arg: Array[String]) {
val A = DenseMatrix((0.5,0.2,0.3),
(0.3,0.5,0.2),
(0.2,0.3,0.5))
val B = DenseMatrix((0.5,0.5),
(0.4,0.6),
(0.7,0.3))
val pi = DenseVector(0.2,0.4,0.4)
val o = DenseVector[Int](0,1,0)
val model = new Model(A,B,pi)
val user = new User("Jack", model, o)
user.eval() // run algorithm
user.printResult()
//spark sql
// val warehouseLocation = "spark-warehouse"
// val spark = SparkSession.builder().appName("Spark.sql.warehouse.dir").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
// import spark.implicits._
// import spark.sql
// val usr = "1"
// val model = new Model(A,B,pi)
// val get_statement = "SELECT * FROM viterbi.observation"
// val df = sql(get_statement)
// val o = DenseVector(df.filter(df("usr")===usr).select(df("obs")).collect().map(_.getInt(0)))
// val user = new User(usr, model, o)
// user.eval()
// user.printResult()
}
}
class Model (val A: DenseMatrix[Double], val B:DenseMatrix[Double], val pi: DenseVector[Double]) {
def info():Unit = {
println("The model is:")
println("A:")
println(A)
println("B:")
println(B)
println("Pi:")
println(pi)
}
}
class User (val usr_name: String, val model: Model, val o:DenseVector[Int]) {
val N = model.A.rows // state number
val M = model.B.cols // observation state
val T = o.length // time
val delta = DenseMatrix.zeros[Double](N,T)
val psi = DenseMatrix.zeros[Int](N,T)
val best_route = DenseVector.zeros[Int](T)
def eval():Unit = {
//1. Initialization
delta(::,0) := model.pi * model.B(::, o(0))
psi(::,0) := DenseVector.zeros[Int](N)
/*2. Induction
*/
val tempDelta = DenseMatrix.zeros[Double](N,N)// Initialization
val tempB = DenseMatrix.zeros[Double](N,N)// Initialization
for (t <- 1 to T-1) {
// Delta
tempDelta := DenseMatrix.tabulate(N, N){case (i, j) => delta(i,t-1)}
tempB := DenseMatrix.tabulate(N, N){case (i, j) => model.B(j, o(t))}
delta(::, t) := DenseVector.tabulate(N){i => max((tempDelta *:* model.A *:* tempB).t.t(::,i))}
}
//3. Maximum
val P_star = max(delta(::, T-1))
val i_star_T = argmax(delta(::, T-1))
best_route(T-1) = i_star_T
//4. Backward
for (t <- T-2 to 0 by -1) {
best_route(t) = psi(best_route(t+1),t+1)
}
}
def printResult():Unit = {
println("User: " + usr_name)
model.info()
println
println("Observed: ")
printRoute(o)
println("Best_route is: ")
printRoute(best_route)
println("delta is")
println(delta)
println("psi is: ")
println(psi)
}
def printRoute(v: DenseVector[Int]):Unit = {
for (i <- v(0 to -2)){
print(i + "->")
}
println(v(-1))
}
}
I also tried --jars argument and passed the location of breeze library, but got the same error.
I need to mention that the I tested the code "locally" on the server and also tested all the method on spark-shell (I can import breeze library on spark-shell on the server).
The server scala version matches the one in sbt build file. Although the spark version is 2.4.0-cdh6.2.1 for which the sbt would not compile if I added "cdh6.2.1" after "2.4.0".
After several days, when I changed the breeze version in sbt build file to 0.13.2 from 1.0, everything worked. But I have no idea what went wrong.
After compiling my code using
sbt package
and submitting them in spark:I got this error:
The sbt build file is as follows:
I can successfully run the code locally though with
sbt run
, so I don't this there is anything wrong with my code. Also, the compile and run-time version of scala and spark are the same.The code for
viterbiAlgo.scala
is:I also tried
--jars
argument and passed the location of breeze library, but got the same error.I need to mention that the I tested the code "locally" on the server and also tested all the method on spark-shell (I can import breeze library on spark-shell on the server).
The server scala version matches the one in sbt build file. Although the spark version is 2.4.0-cdh6.2.1 for which the sbt would not compile if I added "cdh6.2.1" after "2.4.0".
After several days, when I changed the breeze version in sbt build file to
0.13.2
from1.0
, everything worked. But I have no idea what went wrong.