databricks / spark-xml

XML data source for Spark SQL and DataFrames
Apache License 2.0
501 stars 226 forks source link

processing xml files on hdfs in pyspark #515

Closed Sutyke closed 3 years ago

Sutyke commented 3 years ago

Thanks a lot for the excellent package.

I'm downloading large number of xml files. I believe this question will help also to other spark-xml users.

What will be the best practice for storing and reading xmls for efficient spark-xml processing:

1) On hdfs for large xmls

I'm trying to save xmls to hdfs however without success.

Question asked in stack overflow: https://stackoverflow.com/questions/65728216/how-to-write-bytes-string-to-hdfs-hadoop-in-pyspark-for-spark-xml-databricks-p

2) In memory for smaller xmls

I'm trying to read spark dataframe with xml string in each row instead of the file on the disk. How to do use apply spark-xml on dataftame in pyspark?

srowen commented 3 years ago

The data has to be stored somewhere. You can't save it in memory, and, it isn't necessary to cache the data set in memory either. How to copy data to your storage is specific to the storage and not related to this library. But yes you can copy files with hdfs to HDFS. Your example on SO is wrong because you put it on some HDFS path but expect it at the root.

Sutyke commented 3 years ago

Thanks a lot for your quick reply, I think question was miss-understood, I tried the example in SO and it is working, I changed the pseudo path "SomeHdfsPath" to real path "/" -> subprocess.call('hdfs dfs -copyFromLocal /home/user/file.xml /', shell = True) and script saved first file.xml to local root and after copy to hdfs root. Please let me know if it is wrong.

I will clarify in more details:

1) For large XMLs that do not fit to memory, The example in SO use python first to copy data locally and after to copy data to hdfs. It will be appreciated any direction, how to save xml directly to hdfs and not initially save it locally and after copy to hdfs as it is i SO example. I understand that this is not directly related to this package, but it could help to your customers, if you share how to save xml string directly to hdfs in pyspark without saving it previously to local storage.

2) IMHO second question is related to the package. I don't want to save xml to memory, I want to process small xmls batches in memory using spark-xml to speedup the process and avoid saving xmls to disk and reading xmls from disk by spark-xml, only to save transformed data.

Pipeline : . download multiple xml files to list in memory -> load list to spark dataframe in memory- > transform spark dataframe by spark xml in memory -> save spark dataframe to disk (only transformed data ). By this steps data are rapidly processed in memory and save only transformed data.

I noticed in section Parsing Nested XML https://github.com/databricks/spark-xml , there is a way how read xml from spark dataframe.

There is code below:

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

What would be the alternative in pyspark to :

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
srowen commented 3 years ago

That looks right; I don't know what or why you're seeing something unexpected. It's specific to your cluster and not really a question about this library.

You don't have to copy a file locally then to HDFS. You can presumably copy it to HDFS directly, but I don't know where it's coming from, so can't really tell you what to do.

Nothing is 'saved to memory' unless you mean caching a dataset, but, you do not need to do that. Nothing here requires loading the whole doc in memory. That's the point of Spark and this library.

Sutyke commented 3 years ago

The string is comming from the web api that I'm downloading. In minimal code, lets say I have string '1' how I save it to hdfs://hostname:9000/file.xml in pyspark?

QUESTION 1: The code below saves file.xml to '/home/user/file.xml' in local file system. How to write this code in pyspark that file.xml will be saved with the string '1' in hdfs?


with open('/home/user/file.xml' ,'wb') as f:
    f.write(b'<Value>1</Value>') 
Sutyke commented 3 years ago

QUESTION 2: is related to spark-xml docs. There is mentioned code below in scala, how to write this code in pyspark?

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload' 
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
srowen commented 3 years ago

If you're parsing individual XML messages, you probably don't need this library. Just use an XML parser directly. I wouldn't upload a file to HDFS and run a whole Spark job to do something to one file - way too much overhead.

You're asking questions that really aren't related to this library, so this isn't the best place. You do not write files directly with Spark. You can use HDFS APIs to write strings to files from anywhere.

There is no direct Python equivalent to that Scala code though the docs show a workaround: https://github.com/databricks/spark-xml#pyspark-notes

Sutyke commented 3 years ago

Thanks a lot for your time, I tried to explain in minimal example, I have millions of small xml files with the same scheme downloaded asynchronously and I need to extract data from them. I found, that with spark-xml I can read this file and save data to delta lake. At the moment I save xmls to local storage, move them to hdfs and use spark-xml to extract data from them and save them to delta lake. Are you telling that spark-xml is not for that purpose? Or I should not be using it this way?

srowen commented 3 years ago

That's a fine use case if you have lots of files and you're processing in a batch or stream.

Sutyke commented 3 years ago

If you can advice would be correct way to use spark-xml with saving data to delta lake. My xml files are updated daily, I download them with asyncio in batches and extract data from xmls to delta lake. I do not need xmls to be saved at all. What is the correct way to use spark-xml, Should I save all xml files first to hdfs and transform them with spark-xml or should I apply spark-xml on each batches in memory (i mean code you mentioned https://github.com/databricks/spark-xml#pyspark-notes ) , or should I just save each xml batch to local storage apply spark-xml and save to delta lake on hdfs?

srowen commented 3 years ago

They must be in storage somewhere to read them with Spark. You should try to copy directly into that storage; there are many ways to do it. You can delete them after being consumed.

I'm still not sure what you mean about processing batches in memory. That's how Spark works. You don't deal with batches or memory at all.

Sutyke commented 3 years ago

Thanks a lot for your time: So in summary I tried to solve 2 problems. I tried to use spark-xml for both solutions to have as less as possible code for easy maintenance.

1) For large xml files: I tried to save xml files directly to hdfs in pyspark, but it seems it is not possible and I need to use python hdfs or aiohdfs library for asyncio. When xml files are saved in disk this is good user case for spark-xml

2) For large amount of small xml files: What I mean in memory is, when I'm processing small xml files. What I tried to do is to group and download large number of files in batches and transform XMLs in memory to save IO resources. I tried to avoid saving and deleting millions of files to and from the disk every time. Instead I tried to do transformation in memory(during this time I benefit from spark as spark is helping with transformation speed by harvesting processing power from nodes) and save in small batches of transformed xmls to delta lake. As you have mentioned for this part spark-xml should not be used as data must be saved on disk for spark-xml.

Did I get it correctly?

srowen commented 3 years ago

Spark can execute arbitrary code, so you can write the files from a Spark function. However, you'd more usually have some external system landing files in your distributed storage system. To be clear, again: there is no alternative to "XML files saved to disk". There is no such thing as XML only in memory.

I'm not sure what you mean about batching XML files, because Spark and spark-xml does all that for you. It doesn't matter. You may be talking about your own internal process for handling XML, but, I don't know anything about it and that is not related to spark-xml, so this is not the right forum.

Sutyke commented 3 years ago

Thanks a lot for very useful information. I did more in spark-xml and it is excellent package, however there is still a place for an improvement to add to pyspark the same functions as in Scala instead of this workaround: https://github.com/databricks/spark-xml#pyspark-notes. If this will be in place and there will be clear example in pyspark, users could save few days of searching for solution and we wouldn't need to have this conversation at all.

I'm providing user point of view to make spark-xml more user friendly to help other users. All questions are related to spark-xml. Why would I need to save other way uncompressed xml to disk. This is the least efficient way to work with hdfs. By batching XML files I tried to download xml files only to RAM and group them to one XML file to avoid saving to disk huge number of text files.

After your help, I found the way to find more efficient way. After download ( without spilling uncompressed text file to local or hdfs disk) I save XML string to spark data frame. After I apply on data frame pyspark-xml workaround, you have suggested. This approach is more efficient, than saving and deleting uncompressed xml files.Discussion helped me and I believe other spark-xml users.

Could you please advice correct forum, where would you ask about the best practice and data pre-processing for spark-xml and finish this discussion? Do you have any slack channel as delta lake team has?

I'm happy to contribute and improve docs or try to add workaround directly to pyspark if you need a hand.

Thanks in advance for all your help.