Closed mattdowle closed 6 years ago
To clarify @MichaelChirico concerns, first run on Spark does not include start-up time of cluster (in this case a single node cluster). Cluster is already started and data were read into it, cached into memory.
Already had little discussion on that with @st-pasha and problem is not trivial to resolve.
The non trivial parts are:
data.table
has minor overhead as described in [speed up first [.data.table
call](https://github.com/Rdatatable/data.table/issues/2912).As for now I am not seeing reasons good enough, and strategy fair enough, to include warming up solutions for "groupby" task. What looks to be proper way to address that is to add new task for grouping on warmed-up/analyzed/sorted/indexed data.
The dataset is being loaded from file I think. Could it be that Spark is very fast at file load but isn't materializing the data. Then when the first group by comes along, that's when it actually does the load from file. (Adding load times to the report was on the todo list regardless.) If lazy data ingest doesn't explain it, can an issue be raised in spark SO tag or code-review site to see if they know.
@mattdowle I'm not sure whether this is what's going on, but yes, operations are generally lazy in Spark.
This code will be almost instant:
spark.read.parquet('s3://path/to/folder')
Even adding some filtering & basic things will do nothing.
Can force-overcome lazy eval by doing something inexpensive like:
SDF = spark.read.parquet('s3://path/to/folder')
SDF.count()
Open to debate whether something like SDF.cache()
is legit for comparison
.count()
before grouping as suggested by Michael, but it has to be added for all tools, as this is already "collecting statistics" about the data..cache
, otherwise spark would be re-reading csv on each query(?, according to design). It is even more desired to use .cache
for results of queries, other tools do cache answer on side, and it can be accessed later on, unlike AFAIR impala and presto where you needed to use CREATE TABLE AS SELECT
to actually keep query results..cache
method we use .persist(pyspark.StorageLevel.MEMORY_ONLY)
as in recent versions of spark .cache
only wraps to .persist
but does not let you to chose MEMORY_ONLY
. This has to be adjusted when we go for 1e10 grouping benchmark (500GB) to .persist
to memory and disk.Solved in a26b8afffb9ab1051c4c2ffc190cb60f4a424cee
Comment from Michael on Twitter here :
https://twitter.com/michael_chirico/status/1039356873760112641
It seems proportional to the data size, though. What's happening there and is there a way to isolate it and report separately perhaps.