HariSekhon / DevOps-Python-tools

80+ DevOps & Data CLI Tools - AWS, GCP, GCF Python Cloud Functions, Log Anonymizer, Spark, Hadoop, HBase, Hive, Impala, Linux, Docker, Spark Data Converters & Validators (Avro/Parquet/JSON/CSV/INI/XML/YAML), Travis CI, AWS CloudFormation, Elasticsearch, Solr etc.
https://www.linkedin.com/in/HariSekhon
MIT License
772 stars 340 forks source link

json to parquet supported datatypes #1

Open liornaimalon opened 7 years ago

liornaimalon commented 7 years ago

I am trying to convert json files to parquet, in order to load them to aws S3 and query with athena However some binary fields ( for example timestamp in different formats ) are automatically converted to strings ( or "BYTE_ARRAY"S ) How can I control which field in the json is converted to which type in parquet? Can i just use a specific format or is it simply not supported

thanks

HariSekhon commented 7 years ago

I solved that the csv to parquet conversion program by allowing explicit schema to be able to override the types, doing the same thing to the json programs should be very straightforward if you look at https://github.com/HariSekhon/pytools/blob/master/spark_csv_to_parquet.py to see how it's done.

I'll update the json programs with the same capability when I get time.

liornaimalon commented 7 years ago

thanks @HariSekhon for the quick reply The main issue is actually with timestamp/date fields, and I believe it can be easily solved with adding timestampFormat/dateFormat arguments which will be used when the json is read, as described in http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.json

I hope you will update the docker image soon :) thanks

HariSekhon commented 7 years ago

I was thinking about implementing this right now and one of the challenges I think is how to pass the schema types to the program in a generic way on the command line, either:

  1. a --schema-file also in json format containing the fields you want to customize with each value being the type to cast to in the dataframe before writing out to parquet
  2. a comma separated list of fieldname:type,fieldname2:type2... but then fieldname would need to account for json hierarchical structure including arrays eg. field.field2.field3[0].finalfield4:float

Btw I believe the link you pasted is for writing out to JSON format and converting a field which is already a date in a dataframe to a specific string format when writing a JSON file out, which is the inverse of what we're talking about in this case. However I appreciate the same principal could apply for inferring a date field from a specific string found in JSON data, we should incorporate that in to one of the two solutions above (or other if we come up with another good generic method of solving this).

Ultimately the solution should be generic enough to not require rewriting code to make it work for everybody as that's the most stable thing to do and I can add suitable tests on that.

Thanks

Hari

liornaimalon commented 7 years ago

@HariSekhon I think at the time i posted this issue I had too little understanding of what my problem was and how should i solve it - However after a while i realized that my actual problem with the dates/timestamp was that they were not written in one format, but different format and sometimes strings that are not actually dates. Eventually what i did is rename the problematic field to from "x" to "original_x" and try to add a new "x" column in the correct datatype - as follows: from pyspark.sql.functions import to_utc_timestamp df = df.withColumnRenamed('x','original_x') df = df.withColumn('x',to_utc_timestamp(df['original_x'],'UTC'))

Anyway, to address your suggestion, i think a file is more convenient. Also if i'm not wrong, a very long schema (which is generally a possible use-case) can fail with "argument too long" exception

Lior