Yelp / mrjob

Run MapReduce jobs on Hadoop or Amazon Web Services
http://packages.python.org/mrjob/
Other
2.62k stars 586 forks source link

running local mode error #2168

Closed logique233 closed 3 years ago

logique233 commented 4 years ago

Please help me. Why? There is an error in running local mode, but there is no problem in inline mode.

#coding:utf-8
from mrjob.job import MRJob
from mrjob.step import MRStep
import numpy as np
import mrjob

class MRKmeans(MRJob):

    OUTPUT_PROTOCOL = mrjob.protocol.RawProtocol
    ndim=None
    Centroid=None
    nclass=None

    def __init__(self, *args, **kwargs):
        super(MRKmeans, self).__init__(*args, **kwargs)
        global _global_dict
        _global_dict = {}

    def configure_args(self):

        super(MRKmeans, self).configure_args()
        self.add_file_arg('--centroids_input')
        self.add_file_arg('--centroids_output')    
        self.add_passthru_arg('--iterations', help='iterations', default=10, type=int)        

    def load_args(self,args):

        super(MRKmeans,self).load_args(args)

        if self.options.centroids_input is None:
            self.option_parser.error("please type the centroids input file.")
        else:
            self.infile = self.options.centroids_input

        if self.options.centroids_output is None:
            self.outfile = self.infile
        else:
            self.outfile = self.options.centroids_output
        self.iterations = self.options.iterations

    def get_centroids(self):

        Centroid = np.loadtxt(self.infile, delimiter = ',')
        return Centroid

    def write_centroids(self, Centroid):

        np.savetxt("./foo.txt", Centroid[None], fmt = '%.5f',delimiter = ',')

    def relabel_data(self, _, line):
        '''
        Mapper
        '''
        try:
            Coord, Cluster_ID = line.split('|')
        except:
            Coord = line
        Coord_arr = np.array(Coord.split(','), dtype = float)
        global Centroid
        Centroid = self.get_centroids()
        Centroid_arr = np.reshape(Centroid, (-1, len(Coord_arr)))
        global nclass
        global ndim
        nclass = Centroid_arr.shape[0]
        ndim = Centroid_arr.shape[1]

        Distance = ((Centroid_arr - Coord_arr)**2).sum(axis = 1)
        Cluster_ID = str(Distance.argmin() + 1)
        Coord_arr = Coord_arr.tolist()
        yield Cluster_ID, Coord_arr

    def node_combine(self, Cluster_ID, values):
        '''
        Combiner
        '''

        global ndim
        Coord_set = []
        Coord_sum = np.zeros(ndim)
        for Coord_arr in values:
            Coord_set.append(','.join(str(e) for e in Coord_arr))
            Coord_arr = np.array(Coord_arr, dtype = float)
            Coord_sum += Coord_arr
            Coord_sum = Coord_sum.tolist()
        yield Cluster_ID, (Coord_sum, Coord_set)

    def update_centroid(self, Cluster_ID, values):
        '''
        Reducer
        '''
        global ndim
        global Centroid
        global nclass
        final_Coord_set = []
        n = 0
        final_Coord_sum = np.zeros(ndim)
        for Coord_sum, Coord_set in values:
            final_Coord_set += Coord_set
            Coord_sum = np.array(Coord_sum, dtype = float)
            final_Coord_sum += Coord_sum
            n += 1

        new_Centroid = final_Coord_sum / n
        Centroid[ndim * (int(Cluster_ID) - 1) : ndim * int(Cluster_ID)] = new_Centroid
        if int(Cluster_ID) == nclass:
            self.write_centroids(Centroid)

        for final_Coord in final_Coord_set:
            yield None, (final_Coord + '|' + Cluster_ID)

    def steps(self):   
        return [MRStep(mapper=self.relabel_data,
                       combiner=self.node_combine,
                       reducer=self.update_centroid)] * self.iterations 

if __name__ == '__main__':
    MRKmeans.run() 
python ./code/KMeans.py -r local \
--centroids_input ./test_dataset/Centroid.txt \
--centroids_output ./test_dataset/Centroid.txt  \
--iterations 10 \
./test_dataset/e.txt 
Probable cause of failure:

+ /home/logic/anaconda3/bin/python KMeans.py --step-num=0 --combiner --centroids_input Centroid.txt --centroids_output Centroid-1.txt --iterations 10
Traceback (most recent call last):
  File "KMeans.py", line 132, in <module>
    MRKmeans.run() 
  File "/tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/wd/mrjob.zip/mrjob/job.py", line 616, in run
  File "/tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/wd/mrjob.zip/mrjob/job.py", line 678, in execute
  File "/tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/wd/mrjob.zip/mrjob/job.py", line 780, in run_combiner
  File "/tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/wd/mrjob.zip/mrjob/job.py", line 850, in combine_pairs
  File "/tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/wd/mrjob.zip/mrjob/job.py", line 889, in _combine_or_reduce_pairs
  File "KMeans.py", line 92, in node_combine
    Coord_sum = np.zeros(ndim)
NameError: name 'ndim' is not defined

(from lines 7-18 of /tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/stderr)

while reading input from /tmp/KMeans.logic.20200423.102930.879303/step/000/combiner/00000/input