MI-DPLA / combine

Combine /kämˌbīn/ - Metadata Aggregator Platform
MIT License
26 stars 11 forks source link

record input filters, numerical valve results in OOM #226

Closed ghukill closed 6 years ago

ghukill commented 6 years ago

When using the new record_numerical_valve against a very large job, e.g. 1m records, getting the following pyspark error:

ERROR repl.PythonInterpreter: Process has died with 137

Tracing this down, in /var/log/syslog, indicating it's an OOM issue.

Preliminary testing shows only in Merge Jobs, Transform completed successfully. Wonder if the partitions are different from the db bounds in Merge Jobs?

Fired here for Transforms:

                # read output from input job, filtering by job_id, grabbing Combine Record schema fields
        input_job = Job.objects.get(pk=int(self.kwargs['input_job_id']))
        bounds = self.get_job_db_bounds(input_job)
        sqldf = self.spark.read.jdbc(
                settings.COMBINE_DATABASE['jdbc_url'],
                'core_record',
                properties=settings.COMBINE_DATABASE,
                column='id',
                lowerBound=bounds['lowerBound'],
                upperBound=bounds['upperBound'],
                numPartitions=settings.JDBC_NUMPARTITIONS
            )
        records = sqldf.filter(sqldf.job_id == int(self.kwargs['input_job_id']))

        # fork as input_records
        input_records = records

        # filter based on record validity
--------> records = self.record_input_filters(records)

Fired here for Merge Jobs:

                # get total range of id's from input jobs to help partition jdbc reader
        records_ids = []
        for input_job_id in input_jobs_ids:
            input_job_temp = Job.objects.get(pk=int(input_job_id))

            # if job has records, continue
            if len(input_job_temp.get_records()) > 0:               
                records = input_job_temp.get_records().order_by('id')               
                start_id = records.first().id
                end_id = records.last().id
                records_ids += [start_id, end_id]
            else:
                print("Job %s had no records, skipping" % input_job_temp.name)

        records_ids.sort()      

        # get list of RDDs from input jobs
        sqldf = self.spark.read.jdbc(
                settings.COMBINE_DATABASE['jdbc_url'],
                'core_record',
                properties=settings.COMBINE_DATABASE,
                column='id',
                lowerBound=records_ids[0],
                upperBound=records_ids[-1],
                numPartitions=settings.JDBC_NUMPARTITIONS
            )
        input_jobs_dfs = []     
        for input_job_id in input_jobs_ids:

            # get dataframe of input job
            job_df = sqldf.filter(sqldf.job_id == int(input_job_id))

            # apply record input filters
------------>   job_df = self.record_input_filters(job_df)

            # append to input jobs dataframes
            input_jobs_dfs.append(job_df)

        # create aggregate rdd of frames
        agg_rdd = self.spark.sparkContext.union([ df.rdd for df in input_jobs_dfs ])
        agg_df = self.spark.createDataFrame(agg_rdd, schema=input_jobs_dfs[0].schema)
ghukill commented 6 years ago

Fixed by https://github.com/WSULib/combine/issues/227