Open eyankovsky opened 3 years ago
One of the potential root causes of the issues above is breaking of the dependency "org.camunda.bpm.extension.feel.scala:feel-engine-factory:1.10.1" library. This 1.10.1 version is available at Databrick MVN depository while 1.8.0 was required by your POM.xlm. See the error message below. Then I downloaded "org.scala-lang.modules:scala-parser-combinators_2.13:1.2.0-M1" (only available in MVN depository), but "org.camunda.bpm.extension.feel.scala:feel-engine-factory:1.10.1" installation failed anyway.
Please, suggest if there is a way around. ![Uploading Screen Shot 2021-01-29 at 11.32.13 AM.png…]()
Library resolution failed. Cause: java.lang.RuntimeException: org.scala-lang.modules:scala-parser-combinators_2.12 download failed. at com.databricks.libraries.server.MavenInstaller.$anonfun$resolveDependencyPaths$5(MavenLibraryResolver.scala:253) at scala.collection.MapLike.getOrElse(MapLike.scala:131) at scala.collection.MapLike.getOrElse$(MapLike.scala:129) at scala.collection.AbstractMap.getOrElse(Map.scala:63) at com.databricks.libraries.server.MavenInstaller.$anonfun$resolveDependencyPaths$4(MavenLibraryResolver.scala:253) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at com.databricks.libraries.server.MavenInstaller.resolveDependencyPaths(MavenLibraryResolver.scala:249) at com.databricks.libraries.server.MavenInstaller.doDownloadMavenPackages(MavenLibraryResolver.scala:454) at com.databricks.libraries.server.MavenInstaller.$anonfun$downloadMavenPackages$2(MavenLibraryResolver.scala:381) at com.databricks.backend.common.util.FileUtils$.withTemporaryDirectory(FileUtils.scala:464) at com.databricks.libraries.server.MavenInstaller.$anonfun$downloadMavenPackages$1(MavenLibraryResolver.scala:380) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:432) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232) at com.databricks.libraries.server.MavenInstaller.withAttributionContext(MavenLibraryResolver.scala:57) at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:277) at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:270) at com.databricks.libraries.server.MavenInstaller.withAttributionTags(MavenLibraryResolver.scala:57) at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:413) at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:339) at com.databricks.libraries.server.MavenInstaller.recordOperation(MavenLibraryResolver.scala:57) at com.databricks.libraries.server.MavenInstaller.downloadMavenPackages(MavenLibraryResolver.scala:379) at com.databricks.libraries.server.MavenInstaller.downloadMavenPackagesWithRetry(MavenLibraryResolver.scala:137) at com.databricks.libraries.server.MavenInstaller.resolveMavenPackages(MavenLibraryResolver.scala:113) at com.databricks.libraries.server.MavenLibraryResolver.resolve(MavenLibraryResolver.scala:44) at com.databricks.libraries.server.ManagedLibraryManager$GenericManagedLibraryResolver.resolve(ManagedLibraryManager.scala:246) at com.databricks.libraries.server.ManagedLibraryManagerImpl.$anonfun$resolvePrimitives$1(ManagedLibraryManagerImpl.scala:186) at com.databricks.libraries.server.ManagedLibraryManagerImpl.$anonfun$resolvePrimitives$1$adapted(ManagedLibraryManagerImpl.scala:181) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at com.databricks.libraries.server.ManagedLibraryManagerImpl.resolvePrimitives(ManagedLibraryManagerImpl.scala:181) at com.databricks.libraries.server.ManagedLibraryManagerImpl$ClusterStatus.installLibsWithResolution(ManagedLibraryManagerImpl.scala:833) at com.databricks.libraries.server.ManagedLibraryManagerImpl$ClusterStatus.installLibs(ManagedLibraryManagerImpl.scala:819) at com.databricks.libraries.server.ManagedLibraryManagerImpl$InstallLibTask$1.run(ManagedLibraryManagerImpl.scala:479) at com.databricks.threading.NamedExecutor$$anon$2.$anonfun$run$1(NamedExecutor.scala:345) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235) at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232) at com.databricks.threading.NamedExecutor.withAttributionContext(NamedExecutor.scala:275) at com.databricks.threading.NamedExecutor$$anon$2.run(NamedExecutor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Could you provide a list of dependencies that work?
After I tried to install latest libraries: org.camunda.feel:feel-engine:1.12.4 org.camunda.bpm.extension.feel.scala:feel-engine-factory:1.10.1 org.camunda.bpm.dmn:camunda-engine-dmn:7.15.0-alpha2
When tried to install org.camunda.bpm.extension.feel.scala:feel-engine-factory:1.10.1 I had en errors related to parsers (see the screenshot).
I tried to add scala parser libraries from the choice from org.uaparser:uap-scala_2.10:0.11.0 to org.uaparser:uap-scala_2.13:0.11.0
After that I had an error below that points to fundamental Scala package class: (class java.lang.RuntimeException/error reading Scala signature of package.class: Scala signature package has wrong version expected: 5.0 found: 5.2 in package.class)
Error message: error: error while loading package, class file '/local_disk0/tmp/addedFile4991803772813106063feel_engine_1_10_1-c3688.jar(org/camunda/feel/package.class)' is broken (class java.lang.RuntimeException/error reading Scala signature of package.class: Scala signature package has wrong version expected: 5.0 found: 5.2 in package.class)
Hi, @eyankovsky
If you are executing this code in a scenario with more than one node, I think the NullPointerException
is due to the fact that you are reading the DMN table from a local path. The method which you are employing to read the DMN is loadFromLocalPath
. When Spark distirbutes the tasks among the nodes, each one of them will look for the DMN on their own filesystems. Hence, if the the file you specify in loadFromLocalPath
does not exist in each node (in your case, /dbfs/FileStore/tables/CTBRules_New.dmn
), you might get a NullPointerException, since the object dmnModelInstance
inside the DMNExecutor
will not be created properly.
You might try the following:
1) In order to make sure that this is the root cause of the error, you might try to run your Spark job in standalone (just one node which acts as driver and worker)
2) Instead of using the readFromLocalPath
method to load your DMN file, consider using some of these alternatives:
loadFromHDFS
: it allows you to load a DMN file stored in HDFS.loadFromURL
: it allows you to load a DMN file stored in a HTTP server. loadFromInputStream
: if you store your DMN elsewhere and it is now supported by our library, you can manually load it from there, get the InputStream
object and pass it to the method. Anyways, you are welcome to suggest any other input source we might include in our library.Regarding the issue with feel-engine-factory
, I'm not sure whether this library could work with the 1.10.1 version. I can try to update that dependency, but I guess that it won't work.
Finally, regarding the list of dependencies what work, I use the ones within the pom.xml
file. These are:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.extension.feel.scala/feel-engine -->
<dependency>
<groupId>org.camunda.bpm.extension.feel.scala</groupId>
<artifactId>feel-engine</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.extension.feel.scala/feel-engine-factory -->
<dependency>
<groupId>org.camunda.bpm.extension.feel.scala</groupId>
<artifactId>feel-engine-factory</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm.dmn/camunda-engine-dmn -->
<dependency>
<groupId>org.camunda.bpm.dmn</groupId>
<artifactId>camunda-engine-dmn</artifactId>
<version>7.14.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine-plugin-spin -->
<dependency>
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-engine-plugin-spin</artifactId>
<version>7.14.0</version>
</dependency>
Hope this helps you.
Alvaro, thank you for your reply. I followed up on your suggestions and found the next:
These challenges with older and newer packages incompatibilities resulted in the error in the 1st importing package step:
Hi, We are trying to implement dmn4spark inn our Azure Databricks workspaces and have the challenges even with simple decision rule applied to one string field. We put the engine and all yoiur codes inside of the the databricks notebook and made it run without compilation errors. However, the execution error " NullPointerException" with Stream(Decision_Reporting, ?) below suggest that the code can't read the input data properly and generate null. Could you share any simple end-to - end example or hint on how solving the errors with streaming the input data? Thank you in advance, Eugene
##################################################################### CTBRules_New.dmn
<?xml version="1.0" encoding="UTF-8"?>
############################################################################
databricks-logoDMN_Test_KS_EY(Scala) Import Notebook package es.us.idea.dmn4spark.dmn.engine
import org.camunda.bpm.dmn.feel.impl.FeelException import org.camunda.bpm.engine.variable.context.VariableContext import org.camunda.feel.integration.CamundaValueMapper import org.camunda.feel.interpreter.{RootContext, ValueMapper, VariableProvider} import org.camunda.feel.spi.SpiServiceLoader
/**
*/ class SafeCamundaFeelEngine extends org.camunda.bpm.dmn.feel.impl.FeelEngine {
private lazy val engine = new org.camunda.feel.FeelEngine( valueMapper = new CamundaValueMapper, functionProvider = SpiServiceLoader.loadFunctionProvider )
private def asVariableProvider(ctx: VariableContext, valueMapper: ValueMapper): VariableProvider = (name: String) => { if (ctx.containsVariable(name)) { Some(valueMapper.toVal(ctx.resolve(name).getValue)) } else { None } }
override def evaluateSimpleExpression[T](expression: String, ctx: VariableContext): T = { val context = new RootContext( variableProvider = asVariableProvider(ctx, engine.valueMapper)) engine.evalExpression(expression, context) match { case Right(value) => value.asInstanceOf[T] case Left(failure) => throw new FeelException(failure.message) } }
override def evaluateSimpleUnaryTests(expression: String, inputVariable: String, ctx: VariableContext): Boolean = { val context = new RootContext( Map(RootContext.inputVariableKey -> inputVariable), variableProvider = asVariableProvider(ctx, engine.valueMapper))
try { engine.evalUnaryTests(expression, context) match { case Right(value) => value case Left(failure) => System.err.println(s"SAFE FeelEngine: Failure in evalUnaryTests: ${failure.message}." + s"Expression: $expression, " + s"inputVariable: $inputVariable, value: ${context.variable(inputVariable)}"); false //throw new FeelException(failure.message) } } catch { case e: Exception => System.err.println(s"SAFE FeelEngine: Exception caught in evalUnaryTests: ${e}. Expression: $expression, " + s"inputVariable: $inputVariable, value: ${context.variable(inputVariable)}"); false } }
} Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark.dmn.engine
import org.camunda.bpm.dmn.feel.impl.FeelEngine import org.camunda.feel.integration.CamundaFeelEngineFactory
class SafeCamundaFeelEngineFactory extends CamundaFeelEngineFactory{ override def createInstance(): FeelEngine = new SafeCamundaFeelEngine
} Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark.dmn
import java.io.ByteArrayInputStream
import es.us.idea.dmn4spark.dmn.engine.SafeCamundaFeelEngineFactory import org.camunda.bpm.dmn.engine.impl.DefaultDmnEngineConfiguration import org.camunda.bpm.dmn.engine.{DmnDecision, DmnEngine, DmnEngineConfiguration} import org.camunda.bpm.model.dmn.{Dmn, DmnModelInstance}
import scala.collection.JavaConverters
class DMNExecutor(input: Array[Byte], selectedDecisions: Seq[String]) extends Serializable{
/*
object instantiation. */
/**
*/ @transient lazy val dmnEngine: DmnEngine = { val dmnEngineConfig: DefaultDmnEngineConfiguration = DmnEngineConfiguration.createDefaultDmnEngineConfiguration.asInstanceOf[DefaultDmnEngineConfiguration] dmnEngineConfig.setFeelEngineFactory(new SafeCamundaFeelEngineFactory) dmnEngineConfig.setDefaultOutputEntryExpressionLanguage("feel") dmnEngineConfig.buildEngine }
/**
*/ @transient lazy val dmnModelInstance: DmnModelInstance = Dmn.readModelFromStream(new ByteArrayInputStream(input))
@transient lazy val decisions: Seq[DmnDecision] = JavaConverters.collectionAsScalaIterableConverter(dmnEngine.parseDecisions(dmnModelInstance)).asScala.toSeq
@transient lazy val decisionKeys: Seq[String] = decisions.map(_.getKey)
def getDecisionsResults(map: java.util.HashMap[String, AnyRef]): Seq[String] = { decisions.map(d => dmnEngine.evaluateDecisionTable(d, map).getFirstResult.getEntry(d.getKey).toString) }
} Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark.spark
import org.apache.spark.sql.types.{DataType, DataTypes}
object Utils {
case class ColumnInfo(name: String, dataTypeWithAssignedValue: DataTypeWithAssignedValue) case class DataTypeWithAssignedValue(dataType: DataType, establishedValue: Option[Boolean] = None) extends Serializable
def createStructType(fieldNames: Seq[String]) = { DataTypes.createStructType(fieldNames.map(DataTypes.createStructField(_, DataTypes.StringType, true)).toArray) }
/*
value / def inferSecondArgumentType(str: String): DataTypeWithAssignedValue = { str.split(')').headOption match { case Some(x) => if(x.contains("\"")) DataTypeWithAssignedValue(DataTypes.StringType) else { val candidate = x.split("\s+").mkString//.asInstanceOf[Any] if(candidate == "true") DataTypeWithAssignedValue(DataTypes.BooleanType, Some(true)) else if(candidate == "false") DataTypeWithAssignedValue(DataTypes.BooleanType, Some(false)) else if(candidate.matches("^(\d)$")) DataTypeWithAssignedValue(DataTypes.IntegerType) else if(candidate.matches("^(\d)L$")) DataTypeWithAssignedValue(DataTypes.LongType) else if(candidate.matches("^(\d+\.\d|\.?\d+)$")) DataTypeWithAssignedValue(DataTypes.DoubleType) else if(candidate.matches("^(\d+\.\d*|\.?\d+)f$")) DataTypeWithAssignedValue(DataTypes.FloatType) else DataTypeWithAssignedValue(DataTypes.StringType) }
case _ => DataTypeWithAssignedValue(DataTypes.StringType) }
}
def unwrap(a: Any): Any = { a match { case Some(x) => unwrap(x) case null => None case _ => a } }
} Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark.spark
import java.util
import es.us.idea.dmn4spark.spark.Utils.ColumnInfo import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import scala.collection.JavaConverters._ import scala.collection.mutable
object SparkDataConversor {
def spark2javamap(row: Row, dsSchema: Option[org.apache.spark.sql.types.StructType] = None): java.util.HashMap[String, AnyRef] = {
}
def javamap2Spark(map: java.util.Map[String, AnyRef], columnsInfo: List[ColumnInfo]) = { columnsInfo.map(ci => { val value = Utils.unwrap(map.getOrDefault(ci.name, None)) value match { case None => None case x => Some(x.toString) }
} } Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark.dmn
import java.io.{File, FileInputStream, IOException, InputStream} import java.net.{MalformedURLException, URI}
import es.us.idea.dmn4spark.spark.{SparkDataConversor, Utils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.FSDataInputStream import org.apache.spark.sql.api.java.{UDF0, UDF1}
import scala.collection.mutable.ArrayBuffer import scala.io.Source
class DMNSparkEngine(df: DataFrame, selectedDecisions: Seq[String] = Seq()) {
def setDecisions(decisions: String*): DMNSparkEngine = new DMNSparkEngine(df, decisions)
def loadFromLocalPath(path:String) = execute(IOUtils.toByteArray(new FileInputStream(path)))
def loadFromHDFS(uri: String, configuration: Configuration = new Configuration()) = {
}
def loadFromURL(url: String) = { val content = Source.fromURL(url) val bytes = content.mkString.getBytes execute(bytes) }
def loadFromInputStream(is: InputStream) = execute(IOUtils.toByteArray(is))
private def execute(input: Array[Byte]) = { val tempColumn = s"__${System.currentTimeMillis().toHexString}"
} } Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. package es.us.idea.dmn4spark
import es.us.idea.dmn4spark.dmn.DMNSparkEngine import org.apache.spark.sql.DataFrame
object implicits {
implicit class DMN4Spark(df: DataFrame) { def dmn: DMNSparkEngine = new DMNSparkEngine(df)
}
} Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful. import es.us.idea.dmn4spark.implicits. import org.apache.spark.sql.functions.
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .option("header", "true") .load("/FileStore/tables/TrialBalance2.csv") .limit(1) .select("ReportingUnit") / val df1 = df.withColumn("Decision", lit(null: String)) / import es.us.idea.dmn4spark.implicits. import org.apache.spark.sql.functions. df: org.apache.spark.sql.DataFrame = [ReportingUnit: string] df.show() +-------------+ |ReportingUnit| +-------------+ | 1| +-------------+
val dmn_output = df.dmn .setDecisions("Decision_ReportingUnit") .loadFromLocalPath("/dbfs/FileStore/tables/CTBRules_New.dmn")
/ .loadFromLocalPath("/dbfs/FileStore/tables/CTBRules_New-1.dmn") / dmn_output: org.apache.spark.sql.DataFrame = [ReportingUnit: string, Decision_ReportingUnit: string] display(dmn_output) /dmn_output.show()/ SparkException: Failed to execute user defined function(functions$$$Lambda$5872/1957788276: (struct) => struct)
Caused by: NullPointerException:
dmn_output.write.mode("overwrite").parquet("dbfs:/user/hive/warehouse/dmn_output_pqt")
Command skipped
val data = sqlContext.read.parquet("dbfs:/user/hive/warehouse/dmn_output_pqt")
display(data)
Command skipped