Closed marekhorst closed 3 years ago
Relevant redmine ticket is 6369.
As discussed offline we're going to create a Hive table with parsed Elsevier XML contents. This will allow to easily query the data and tune the mining scripts.
This task turned out to be plagued with various problems so I'm going to describe the workarounds here. This can come handy in the future when a similar task will be requested.
The original data is archived using ZIP format. This format does not play well with Spark so the first step was to extract the archives and put the content to HDFS. The data is contained within a nested hierarchy of dirs with each XML file at the bottom level. Random sampling showed that there is probably a single document in each file. The problem is that our Spark version does not support recursive reading of text files. So the first step was to create a list of paths to the files:
/**
* should recursively list files in HDFS dir, may return MANY files
*/
def listFilesInPath(inputPath: String): immutable.Seq[String] = {
val it = FileSystem.get(spark.sparkContext.hadoopConfiguration).listFiles(new Path(inputPath), true)
val pathBuffer = mutable.ListBuffer.empty[String]
while (it.hasNext) {
pathBuffer += it.next().getPath.toString
}
pathBuffer.toList
}
The second problem was the way Spark handles reading in text files. Spark creates a partition for each file it reads so in the case of Elsevier data we would end up with 74515 partitions. Spark breaks down for so many partitions so the workaround is to read the files in batches, creating a dataframe for each batch, coalescing to one partition and unioning with the resulting dataframe:
val df = pathList.grouped(1000).foldLeft(Seq.empty[String].toDF("value")) { case (acc, pathListGroup) =>
val sub = pathListGroup.grouped(50).foldLeft(Seq.empty[String].toDF("value")) { case (subacc, paths) =>
subacc.union(spark.read.option("wholetext", value = true).text(paths: _*).coalesce(1))
}
acc.union(sub)
}
The values used for batch sizes are experimental and definitely are not generic. The numbers above are the ones for which reading current the data succeeded. They are probably not optimal either. Moreover Spark throws very puzzling errors when things go wrong
java.lang.ClassNotFoundException: Failed to find data source: text. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:711)
at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:753)
at $anonfun$1$$anonfun$apply$1.apply(<console>:33)
at $anonfun$1$$anonfun$apply$1.apply(<console>:32)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
at $anonfun$1.apply(<console>:32)
at $anonfun$1.apply(<console>:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
... 47 elided
Caused by: java.lang.ClassNotFoundException: text.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 62 more
Reading in the full data dump took a lot of time so another workaround was needed - a temporary storage location where the full datastore would be written in a format that allows quicker processing. This was easily done using parquet.
df.write.parquet(outputPath)
The parsing is based on code samples from JATS data analysis #1196
import com.elsevier.spark_xml_utils.xpath.XPathProcessor
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.DataTypes
import java.util
import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
def get_xml_object_string(path: String, namespaceMappings: Map[String, String] = null): UserDefinedFunction = udf((xml: String) => {
Try({
val proc = Option(namespaceMappings)
.map(x => XPathProcessor.getInstance(path, new util.HashMap[String, String](x.asJava)))
.getOrElse(XPathProcessor.getInstance(path))
proc.evaluate(xml)
}) match {
case Success(value) if value.nonEmpty => value
case _ => null
}
})
def uuid: UserDefinedFunction = udf(() => UUID.randomUUID().toString, DataTypes.StringType)
val sourceDf = spark.read.parquet(outputPath)
sourceDf.cache()
sourceDf.count()
/**
* adding columns with parsed data
*/
val df = sourceDf
.withColumnRenamed("value", "document")
.withColumn("id", uuid())
.withColumn("acknowledgment_elem", get_xml_object_string("/*:document/*:article/*:body/*:acknowledgment")($"document"))
.withColumn("acknowledgment", get_xml_object_string("/*:document/*:article/*:body/*:acknowledgment/string()")($"document"))
Spark dataframes can be saved as Hive tables using hive
format and with saveAsTable
method. It turned out that this is not possible from our Zeppelin instance because Spark interpreter is not properly configured to access Hive metastore. This is however possible in Spark Shell. But another problem emerged - saving a dataframe without nulls creates also records with all columns set to NULL
for each record in the dataframe. Hive and Spark doc search did not help to fix this so another workaround was needed - after dataframe save we need to explicitly drop the records with nulls:
val dbName = "openaire_6396"
spark.sql(s"create database $dbName")
df
.write
.format("hive")
.saveAsTable(s"$dbName.elsevier_xml_parsed_tmp")
/**
* to remove records with NULL columns create a second table with NULLs removed
*/
spark.sql(s"create table $dbName.elsevier_xml_parsed as select * from $dbName.elsevier_xml_parsed_tmp where id is not null")
/**
* drop bad table
*/
spark.sql(s"drop table $dbName.elsevier_xml_parsed_tmp")
The analytics was already done so I am closing this task.
For reference: mining results are in Hive table waiting for curation
hive> use openaire_6396;
OK
Time taken: 0.028 seconds
hive> show tables;
OK
elsevier_xml_parsed
Time taken: 0.059 seconds, Fetched: 1 row(s)
hive> describe elsevier_xml_parsed;
OK
document string
id string
acknowledgment_elem string
acknowledgment string
Time taken: 0.116 seconds, Fetched: 4 row(s)
Further integration will be done in dedicated tickets.
Mainly to come up with the possible ways of extracting acknowledgement statements which will be streamed down to projects mining within the IIS later on. This will not become a part of a master branch anytime soon, we should consider it as experiments for now.