databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
499 stars 226 forks source link

XML Data Source for Apache Spark 3.x

Linking

You can link against this library in your program at the following coordinates:

groupId: com.databricks
artifactId: spark-xml_2.12
version: 0.18.0

Using with Spark shell

This package can be added to Spark using the --packages command line option. For example, to include it when starting the spark shell:

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.18.0

Features

This package allows reading XML files in local or distributed filesystem as Spark DataFrames.

When reading files the API accepts several options:

When writing files the API accepts several options:

Currently it supports the shortened name usage. You can use just xml instead of com.databricks.spark.xml.

NOTE: created files have no .xml extension.

XSD Support

Per above, the XML for individual rows can be validated against an XSD using rowValidationXSDPath.

The utility com.databricks.spark.xml.util.XSDToSchema can be used to extract a Spark DataFrame schema from some XSD files. It supports only simple, complex and sequence types, and only basic XSD functionality.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Parsing Nested XML

Although primarily used to convert (portions of) large XML documents into a DataFrame, spark-xml can also parse XML in a string-valued column in an existing DataFrame with from_xml, in order to add it as a new column with parsed results as a struct.

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Pyspark notes

The functions above are exposed in the Scala API only, at the moment, as there is no separate Python package for spark-xml. They can be accessed from Pyspark by manually declaring some helper functions that call into the JVM-based API from Python. Example:

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

Structure Conversion

Due to the structure differences between DataFrame and XML, there are some conversion rules from XML data to DataFrame and from DataFrame to XML data. Note that handling attributes can be disabled with the option excludeAttribute.

Conversion from XML to DataFrame

Conversion from DataFrame to XML

Examples

These examples use a XML file available for download here:

$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml

SQL API

XML data source for Spark can infer data types:

CREATE TABLE books
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")

You can also specify column names and types in DDL. In this case, we do not infer schema.

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")

Scala API

Import com.databricks.spark.xml._ to get implicits that add the .xml(...) method to DataFrame. You can also use .format("xml") and .load(...).

import org.apache.spark.sql.SparkSession
import com.databricks.spark.xml._

val spark = SparkSession.builder().getOrCreate()
val df = spark.read
  .option("rowTag", "book")
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

You can manually specify the schema when reading data:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._

val spark = SparkSession.builder().getOrCreate()
val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

Java API

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession.builder().getOrCreate();
DataFrame df = spark.read()
  .format("xml")
  .option("rowTag", "book")
  .load("books.xml");

df.select("author", "_id").write()
  .format("xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .save("newbooks.xml");

You can manually specify schema:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

SparkSession spark = SparkSession.builder().getOrCreate();
StructType customSchema = new StructType(new StructField[] {
  new StructField("_id", DataTypes.StringType, true, Metadata.empty()),
  new StructField("author", DataTypes.StringType, true, Metadata.empty()),
  new StructField("description", DataTypes.StringType, true, Metadata.empty()),
  new StructField("genre", DataTypes.StringType, true, Metadata.empty()),
  new StructField("price", DataTypes.DoubleType, true, Metadata.empty()),
  new StructField("publish_date", DataTypes.StringType, true, Metadata.empty()),
  new StructField("title", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = spark.read()
  .format("xml")
  .option("rowTag", "book")
  .schema(customSchema)
  .load("books.xml");

df.select("author", "_id").write()
  .format("xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .save("newbooks.xml");

Python API

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('xml').options(rowTag='book').load('books.xml')
df.select("author", "_id").write \
    .format('xml') \
    .options(rowTag='book', rootTag='books') \
    .save('newbooks.xml')

You can manually specify schema:

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()
customSchema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)])

df = spark.read \
    .format('xml') \
    .options(rowTag='book') \
    .load('books.xml', schema = customSchema)

df.select("author", "_id").write \
    .format('xml') \
    .options(rowTag='book', rootTag='books') \
    .save('newbooks.xml')

R API

Automatically infer schema (data types)

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:0.18.0"))

df <- read.df("books.xml", source = "xml", rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")

You can manually specify schema:

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:0.18.0"))
customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")

Hadoop InputFormat

The library contains a Hadoop input format for reading XML files by a start tag and an end tag. This is similar with XmlInputFormat.java in Mahout but supports to read compressed files, different encodings and read elements including attributes, which you may make direct use of as follows:

import com.databricks.spark.xml.XmlInputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.io.{LongWritable, Text}

val sc: SparkContext = _

// This will detect the tags including attributes
sc.hadoopConfiguration.set(XmlInputFormat.START_TAG_KEY, "<book>")
sc.hadoopConfiguration.set(XmlInputFormat.END_TAG_KEY, "</book>")

val records = sc.newAPIHadoopFile(
  "path",
  classOf[XmlInputFormat],
  classOf[LongWritable],
  classOf[Text])

Building From Source

This library is built with SBT. To build a JAR file simply run sbt package from the project root.

Acknowledgements

This project was initially created by HyukjinKwon and donated to Databricks.