self.nodata is used, but self is not defined in the function.
Solution:
arr[np.isnan(arr)] = nodata
KeyError in Algorithm Filtering
The keys used in the dictionary (dic) do not match the algorithm names used in the deletion loop.
Solution: Adjust the keys in dic to be the algorithm name:
Using kmeanModel.score(arr) for 'auto' and kmeanModel.inertia_ for 'elkan
Solution: Use inertia in both cases: inertias_inter_lloyd.append(kmeanModel.inertia_)
Alternative: If you prefer to use the score, warn that it's the negative of inertia.
Deprecated Algorithm Names
The algorithm parameter in KMeans has deprecated options 'auto' and 'full'.
Solution: Adjust the algo parameter default to ('lloyd', 'elkan').
Handling NoData Values with Fuzzy Matching
This occurs in a few places, and as type hints are not used people may pass an int or a float. Consider instead:
if np.isnan(nodata):
nodata_mask = np.isnan(arr)
else:
nodata_mask = np.isclose(arr, nodata)
arr[nodata_mask] = np.nan # Or replace with a suitable value
Proposed change
Incoporating type hints and other best practice:
from typing import Optional, Tuple, Dict, List
from sklearn.cluster import KMeans
import rasterio
import numpy as np
def calkmeans(
image: rasterio.io.DatasetReader,
k: Optional[int] = None,
algo: Tuple[str, ...] = ("lloyd", "elkan"),
max_iter: int = 300,
n_iter: int = 10,
nodata: float = -99999.0,
**kwargs
) -> Dict[str, List[float]]:
'''
Calibrating KMeans Clustering Algorithm
This function calibrates the KMeans algorithm for satellite image classification.
It can either find the optimal number of clusters (k) by evaluating the inertia
over a range of cluster numbers or determine the best algorithm ('lloyd' or 'elkan')
for a given k.
Parameters:
image (rasterio.io.DatasetReader): Optical image with 3D data.
k (Optional[int]): The number of clusters. If None, the function finds the optimal k.
algo (Tuple[str, ...]): Algorithms to evaluate ('lloyd', 'elkan').
max_iter (int): Maximum iterations for KMeans (default 300).
n_iter (int): Number of iterations or clusters to evaluate (default 10).
nodata (float): The NoData value to identify and handle in the data.
**kwargs: Additional arguments passed to sklearn.cluster.KMeans.
Returns:
Dict[str, List[float]]: A dictionary with algorithm names as keys and lists of
inertia values as values.
Notes:
- If k is None, the function evaluates inertia for cluster numbers from 1 to n_iter.
- If k is provided, the function runs KMeans n_iter times for each algorithm to
evaluate their performance.
- The function handles NoData values using fuzzy matching to account for floating-point precision.
'''
if not isinstance(image, rasterio.io.DatasetReader):
raise TypeError('"image" must be raster read by rasterio.open().')
bands: int = image.count
rows: int = image.height
cols: int = image.width
# Read and reshape the image data
st: np.ndarray = image.read() # Shape: [bands, rows, cols]
st_reorder: np.ndarray = np.moveaxis(st, 0, -1) # Shape: [rows, cols, bands]
arr: np.ndarray = st_reorder.reshape((rows * cols, bands)) # Shape: [rows*cols, bands]
# Handle nodata values with fuzzy matching
if np.isnan(nodata):
nodata_mask = np.isnan(arr).any(axis=1)
else:
nodata_mask = np.isclose(arr, nodata).any(axis=1)
# Extract valid data (rows without nodata)
valid_data: np.ndarray = arr[~nodata_mask]
# Check if there is any valid data
if valid_data.size == 0:
raise ValueError("No valid data found to perform clustering.")
# Validate algorithms
valid_algorithms = ("lloyd", "elkan")
for algorithm in algo:
if algorithm not in valid_algorithms:
raise ValueError(f"Invalid algorithm '{algorithm}'. Must be one of {valid_algorithms}.")
results: Dict[str, List[float]] = {}
if k is None:
# Finding the optimal number of clusters
for algorithm in algo:
inertias: List[float] = []
for n_clusters in range(1, n_iter + 1):
kmeans = KMeans(n_clusters=n_clusters, max_iter=max_iter, algorithm=algorithm, **kwargs)
kmeans.fit(valid_data)
inertias.append(kmeans.inertia_)
results[algorithm] = inertias
return results
elif isinstance(k, int) and k > 0:
# Evaluating algorithms for a given k
for algorithm in algo:
inertias: List[float] = []
for _ in range(n_iter):
kmeans = KMeans(n_clusters=k, max_iter=max_iter, algorithm=algorithm, **kwargs)
kmeans.fit(valid_data)
inertias.append(kmeans.inertia_)
results[algorithm] = inertias
return results
else:
raise TypeError(f"'k' must be None or a positive integer. Got {k} of type {type(k)}.")
And a test
import pytest
import numpy as np
import rasterio
from rasterio.io import MemoryFile
def test_calkmeans():
# Create synthetic image data
width, height, bands = 100, 100, 3 # Image dimensions and number of bands
np.random.seed(42) # For reproducibility
# Generate synthetic image with three clusters
cluster_centers = np.array([
[50, 50, 50],
[150, 150, 150],
[250, 250, 250]
])
total_pixels = height * width # 10000 pixels
num_clusters = len(cluster_centers) # 3 clusters
# Calculate base size and remainder
base_size = total_pixels // num_clusters # 3333 pixels per cluster
remainder = total_pixels % num_clusters # 1 pixel remaining
# Initialize cluster sizes
cluster_sizes = [base_size] * num_clusters # [3333, 3333, 3333]
# Distribute the remainder
for i in range(remainder):
cluster_sizes[i] += 1 # Now cluster_sizes = [3334, 3333, 3333]
cluster_data = []
for center, size in zip(cluster_centers, cluster_sizes):
data = np.random.normal(loc=center, scale=10, size=(size, bands))
cluster_data.append(data)
# Combine and shuffle the data
image_data = np.vstack(cluster_data)
np.random.shuffle(image_data)
image_data = image_data.reshape((height, width, bands)).astype(np.float32)
# Introduce nodata values
nodata_value = -99999.0
image_data[0, 0, :] = nodata_value # Set the first pixel to nodata
# Create an in-memory raster dataset
transform = rasterio.transform.from_origin(0, 0, 1, 1) # Arbitrary transform
with MemoryFile() as memfile:
# Open the in-memory file in write mode
with memfile.open(
driver='GTiff',
height=height,
width=width,
count=bands,
dtype='float32',
transform=transform,
nodata=nodata_value
) as dataset_writer:
# Write data to the dataset
for i in range(bands):
dataset_writer.write(image_data[:, :, i], i + 1)
# Note: No need to call dataset_writer.close() explicitly within 'with' block
# Now reopen the dataset in read mode
with memfile.open() as dataset:
# dataset_reader is a DatasetReader
# Run the calkmeans function
# Case 1: Find the optimal number of clusters
results_optimal_k = calkmeans(
image=dataset,
k=None,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)
# Case 2: Evaluate algorithms for a given k
k_value = 3
results_given_k = calkmeans(
image=dataset,
k=k_value,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)
# Case 2: Evaluate algorithms for a given k
k_value = 3
results_given_k = calkmeans(
image=dataset,
k=k_value,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)
# Assertions to check the outputs
# Case 1: Check if results contain expected keys and correct number of inertia values
assert isinstance(results_optimal_k, dict), "Results should be a dictionary."
for algorithm in ('lloyd', 'elkan'):
assert algorithm in results_optimal_k, f"Algorithm '{algorithm}' should be in results."
inertias = results_optimal_k[algorithm]
assert len(inertias) == 5, "Inertia list should have length equal to n_iter."
assert all(isinstance(val, float) for val in inertias), "Inertias should be floats."
# Case 2: Check if results contain expected keys and correct number of inertia values
assert isinstance(results_given_k, dict), "Results should be a dictionary."
for algorithm in ('lloyd', 'elkan'):
assert algorithm in results_given_k, f"Algorithm '{algorithm}' should be in results."
inertias = results_given_k[algorithm]
assert len(inertias) == 5, "Inertia list should have length equal to n_iter."
assert all(isinstance(val, float) for val in inertias), "Inertias should be floats."
print("All tests passed.")
test_calkmeans()
Bonus - make scikit-eoMultispectral capable
This func can actually accept != 3 channels, propose either documenting which funcs dont support != 3 or making all more flexible
self.nodata
self.nodata is used, but self is not defined in the function. Solution:
arr[np.isnan(arr)] = nodata
KeyError in Algorithm Filtering
The keys used in the dictionary (dic) do not match the algorithm names used in the deletion loop. Solution: Adjust the keys in dic to be the algorithm name:
Inconsistency in Metric Computation:
Using
kmeanModel.score(arr)
for 'auto' andkmeanModel.inertia_
for 'elkanSolution: Use inertia in both cases:
inertias_inter_lloyd.append(kmeanModel.inertia_)
Alternative: If you prefer to use the score, warn that it's the negative of inertia.Deprecated Algorithm Names
The algorithm parameter in KMeans has deprecated options 'auto' and 'full'.
Solution: Adjust the algo parameter default to ('lloyd', 'elkan').
Handling NoData Values with Fuzzy Matching
This occurs in a few places, and as type hints are not used people may pass an int or a float. Consider instead:
Proposed change
Incoporating type hints and other best practice:
And a test
Bonus - make scikit-eoMultispectral capable
This func can actually accept != 3 channels, propose either documenting which funcs dont support != 3 or making all more flexible