bwoneill / pypardis

A parallel distributed implementation of DBSCAN on Spark using Python
75 stars 26 forks source link

some suggestions #8

Open zhchl123 opened 2 years ago

zhchl123 commented 2 years ago

too many loops makes the code slow , I write a parallel version of function DBSCAN._create_neighborhoods, it seems work well for now。_create_partitions can also be rewriten,but it's a little complicated, I haven't finish it yet。

`

    def _create_neighborhoods2(self):
        bounding_boxes = self.sc.broadcast(self.bounding_boxes)
        eps = self.sc.broadcast(self.eps)

        def expand_item(item):
            ret = []
            for label, box in bounding_boxes.value.iteritems():
                expanded_box = box.expand(2 * eps.value)
                if expanded_box.contains(item[1]):
                    ret.append(((item[0], label), item[1]))
            return ret
        new_data = self.data.flatMap(expand_item)
        self.data = new_data

`

zhchl123 commented 2 years ago

faster dbscan within partition

`

def dbscan_partition2(iterable, params):
    from sklearn.neighbors import NearestNeighbors
    from sklearn.cluster import DBSCAN
    cluster_eps = params['eps']
    data = list(iterable)
    (key, part), vector = data[0]
    feat_mat = np.array([v for (_, __), v in data])
    y = [k for (k, _), __ in data]
    neigh = NearestNeighbors(radius=cluster_eps, algorithm='ball_tree')
    neigh.fit(feat_mat)
    dist = neigh.radius_neighbors_graph(feat_mat, mode='distance')

    model = DBSCAN(eps=cluster_eps, min_samples=params['min_samples'], metric='precomputed')
    c = model.fit_predict(dist)
    cores = set(model.core_sample_indices_)
    for i in xrange(len(c)):
        flag = '' if i in cores else '*'
        yield (y[i], '%i:%i%s' % (part, c[i], flag))

`