In your trainer.py code you use repartition. I suggest coalesce as to not trigger shuffle and cause expense time spending.
trainer.py
if shuffle:
dataframe = shuffle(dataframe)
# Indicate the parallelism (number of worker times parallelism factor).
parallelism = self.parallelism_factor * self.num_workers
# Check if we need to repartition the dataframe.
if num_partitions >= parallelism:
dataframe = dataframe.coalesce(parallelism)
else:
dataframe = dataframe.repartition(parallelism)
In your trainer.py code you use repartition. I suggest coalesce as to not trigger shuffle and cause expense time spending.