teragrep / pth_10

Data Processing Language (DPL) translator for Apache Spark
GNU Affero General Public License v3.0
0 stars 6 forks source link

hdfs save performance slow #197

Open kortemik opened 8 months ago

kortemik commented 8 months ago

Describe the bug hdfs save performance regression reported since 3b211fee33aa6257f106395097cdf8c9cb14f198

Merge PR#566 'Circumvent avro format's naming restrictions by encoding the column names and retrieve them when loading; Additional HDFS Sequential Save fix.'

Expected behavior hdfs save performance should be better or equal as it was before.

How to reproduce use hdfs save (possibly without aggregates)

Screenshots

+
+        LOGGER.info("Dataset in HDFS save was streaming={}", dataset.isStreaming());
         if (!dataset.isStreaming()) {
             // Non-streaming dataset, e.g. inside forEachBatch (sequential stack mode)
-            List<String> dsAsJsonList = dataset.toJSON().collectAsList();
-            final String json = Arrays.toString(dsAsJsonList.toArray());
-
-            Dataset<Row> jsonifiedDs = catCtx.getSparkSession().createDataFrame(Collections.singletonList(RowFactory.create(json)),jsonifiedSchema);
-
             final String cpPath = pathStr + "/checkpoint";
             DataFrameWriter<Row> hdfsSaveWriter =
-                    jsonifiedDs
+                    convertedDataset
                             .write()
                             .format("avro")
-                            .option("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
-                            .option("path", pathStr.concat("/data"))
-                            .option("checkpointLocation", cpPath);
+                            .mode(aggsUsedBefore ? SaveMode.Overwrite : SaveMode.Append)
+                            .option("checkpointLocation", cpPath)
+                            .option("path", pathStr.concat("/data"));
+
+            hdfsSaveWriter.save();

-            catCtx.getObjectStore().add(id, hdfsSaveWriter);
+            //catCtx.getObjectStore().add(id, hdfsSaveWriter);

mode is possibly the culprit.

Software version 4.16.0

Desktop (please complete the following information if relevant):

Additional context

kortemik commented 8 months ago

see pth_10-issue-197 in confluence @StrongestNumber9

StrongestNumber9 commented 8 months ago

Any reference performance numbers or environment specs? Is it 10% slower, 50%, 90%, 99%? And how much data (event count and roughly event size) just so I can test theoretical speeds of raw hdfs copy as well

kortemik commented 8 months ago

at least 50? also the data was not forwarded to the second destination specified on the query as it used to.

kortemik commented 8 months ago

copy is always faster than avro appends so it does not make sense to test the copy speed compared to file structured write.

eemhu commented 8 months ago

internal pr request for kafka fix submitted