vepadulano / PyRDF

Python Library for doing ROOT RDataFrame analysis
https://pyrdf.readthedocs.io/en/latest/
9 stars 7 forks source link

[WIP] send headers to executors and declare them at runtime #38

Closed vepadulano closed 5 years ago

vepadulano commented 5 years ago

this addresses #2

vepadulano commented 5 years ago

First proposal: Since calling the get_distributed_files method of Spark subclass from the map method of the Dist class forces the pickling of the Spark instance to send it to the workers, one solution would be to modify the behaviour of pickle on the Spark class via the __getstate__ method. For instance, it could just return None, so that the extra information sent to the executors would be minimum and only regarding the class structure.

vepadulano commented 5 years ago

Second proposal: According to the very Spark-specific nature of this issue, a possibility would be to consider adding a new mapper in the Spark subclass, which would have the same logic as the one of the parent class with the addition of the fetching and declaration of headers on the executors. Something along these lines:

    def ProcessAndMerge(self, mapper, reducer, includes):
        ranges = self.build_ranges(self.npartitions)  # Get range pairs 
        def mapSpark(includes):
            files_on_executor = [
            SparkFiles.get(ntpath.basename(filepath))
            for filepath in includes
        ] 
            # insert mapping logic here
            return Values
        # Build parallel collection
        sc = self.sparkContext
        parallel_collection = sc.parallelize(ranges, self.npartitions)

        # Map-Reduce using Spark
        return parallel_collection.map(mapSpark).treeReduce(reducer)

This would make the map function inside the execute method of the Dist class a bit unneeded, at least until new backends will be implemented