TresAmigosSD / SMV

Spark Modularized View
Apache License 2.0
42 stars 22 forks source link

SmvMultiCSVInputFiles lacks parallelization #1619

Open CharlesNaylor opened 4 years ago

CharlesNaylor commented 4 years ago

Hi,

I have a large CSV file exported from Google BigQuery onto Google Cloud Storage. It gets broken into thousands of subfiles within the bucket.

If I attempt to read the file in pyspark with SmvCSVInputFile, I get an FQN error, presumably related to being unable to parse the CSV path ('gs://...My_CSV.csv', which is a bucket containing some 8000 individual files).

If I read it using SmvMultiCSVInputFiles, it discovers underlying files, then reads them in a loop:

def _get_input_data(self):
        dir_path = os.path.join(self.get_connection().path, self.dirName())
        smv_schema = self.smvSchema()

        flist = self.smvApp._jvm.SmvHDFS.dirList(dir_path).array()
        # ignore all hidden files in the data dir
        filesInDir = [os.path.join(dir_path, n) for n in flist if not n.startswith(".")]

        if (not filesInDir):
            raise SmvRuntimeError("There are no data files in {}".format(dir_path))

        combinedDf = None
        reader_logger = self._readerLogger()
        for filePath in filesInDir:
            df = SmvCsvOnHdfsIoStrategy(
                self.smvApp,
                filePath,
                smv_schema,
                reader_logger
            ).read()
            combinedDf = df if (combinedDf is None) else combinedDf.unionAll(df)

        return combinedDf

This uses < 1% of my cluster for 11 hours as it is reading each file sequentially. If I read using SparkSession it takes minutes. I don't seem to be able to control BigQuery's partitioning strategy to reduce the number of files.

Is there any way to parallelize this operation?

laneb commented 4 years ago

What error is raised when you use SmvCSVInputFile?

CharlesNaylor commented 4 years ago

It fails to find the connection I've defined. Not sure what the underlying problem is. The connection as defined works ok with Multi.

  File "/opt/conda/default/lib/python3.6/site-packages/smv/iomod/base.py", line 57, in get_connection
    conn = self.smvApp.get_connection_by_name(name)
  File "/opt/conda/default/lib/python3.6/site-packages/smv/smvapp.py", line 303, in get_connection_by_name
    return ConnClass(name, props)
TypeError: 'NoneType' object is not callable

Full log below: stdout.txt