Created
July 4, 2012 00:50
-
-
Save choffstein/3044426 to your computer and use it in GitHub Desktop.
A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise | |
# Martin Ester, Hans-Peter Kriegel, Jörg Sander, Xiaowei Xu | |
# dbscan: density based spatial clustering of applications with noise | |
import numpy as np | |
import math | |
UNCLASSIFIED = False | |
NOISE = None | |
#manhattan distance | |
def dist(p,q): | |
return math.sqrt(np.power(p-q,2).sum()) | |
def eps_neighborhood(p,q,eps): | |
return dist(p,q) < eps | |
def region_query(m, point_id, eps): | |
n_points = m.shape[1] | |
seeds = [] | |
for i in range(0, n_points): | |
if not i == point_id: | |
if eps_neighborhood(m[:,point_id], m[:,i], eps): | |
seeds.append(i) | |
return seeds | |
def _expand_cluster(m, classifications, point_id, cluster_id, eps, min_points): | |
seeds = region_query(m, point_id, eps) | |
if len(seeds) < min_points: | |
classifications[point_id] = NOISE | |
return False | |
else: | |
classifications[point_id] = cluster_id | |
for seed_id in seeds: | |
classifications[seed_id] = cluster_id | |
while len(seeds) > 0: | |
current_point = seeds[0] | |
results = region_query(m, current_point, eps) | |
if len(results) >= min_points: | |
for i in range(0, len(results)): | |
result_point = results[i] | |
if classifications[result_point] == UNCLASSIFIED or \ | |
classifications[result_point] == NOISE: | |
if classifications[result_point] == UNCLASSIFIED: | |
seeds.append(result_point) | |
classifications[result_point] = cluster_id | |
seeds = seeds[1:] | |
return True | |
def main_dbscan(m, eps, min_points): | |
cluster_id = 1 | |
n_points = m.shape[1] | |
classifications = [UNCLASSIFIED] * n_points | |
for point_id in range(0, n_points): | |
point = m[:,point_id] | |
if classifications[point_id] == UNCLASSIFIED: | |
if _expand_cluster(m, classifications, point_id, cluster_id, eps, min_points): | |
cluster_id = cluster_id + 1 | |
return classifications | |
if __name__ == "__main__": | |
pass | |
def test_main_dbscan(): | |
m = np.matrix('1 1.2 0.8 3.7 3.9 3.6 10; 1.1 0.8 1 4 3.9 4.1 10') | |
eps = 0.5 | |
min_points = 2 | |
assert main_dbscan(m, eps, min_points) == [1, 1, 1, 2, 2, 2, None] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment