Open wenmin-wu opened 4 years ago
HADOOP_VERSION=2.7 APACHE_SPARK_VERSION=2.4.3 PYTHON_VERSION=3
codes:
spark = None try: print('creating spark cluster') spark = SparkSession.builder\ .appName(spark_app_name) \ .config('spark.kubernetes.namespace', spark_k8s_namespace) \ .config('spark.jars.packages', 'org.tensorflow:spark-tensorflow-connector_2.11:1.14.0') \ .config("mapreduce.fileoutputcommitter.algorithm.version", "2") \ .config('spark.executor.instances', str(spark_n_executors)) \ .config('spark.sql.windowExec.buffer.spill.threshold', '2097152') \ .config('spark.sql.windowExec.buffer.in.memory.threshold', '2097152') \ .enableHiveSupport() \ .getOrCreate() # --- logic --- df = spark.read.parquet(datalake_location) ... df.repartition(num_partition)\ .write\ .format("tfrecords")\ .mode("overwrite")\ .save(dataset_location) finally: # terminate spark context if spark: print('stopping spark cluster') spark.stop()
After all jobs has been finished, it will write to _temporary/ under target directory first and then write to the target directory sequentially which cost a lot of time as following:
_temporary/
codes:
After all jobs has been finished, it will write to
_temporary/
under target directory first and then write to the target directory sequentially which cost a lot of time as following: