from abc import ABC, abstractmethod
from ..node import Node, CubeConsumer
from ..utils.numpy import flatten_spatial, flatten_labels, unflatten_spatial
import numpy as np
import typing
import uuid
import yaml
import warnings
from copy import deepcopy
[docs]
class AbstractDistance(Node, CubeConsumer):
"""
Abstract class for data preprocessing.
There are two ways spectral distance can be used
1) Distance relative to a reference spectra, which returns a single scored image
2) Distance relative to multiple reference spectral, which returns an array of scored images
(This can then be passed into a decider algorithm)
# TODO: Should this behavior be allowed? This might break the notion of input/output dimensions
- If the spectra are known, they can be stored with the object initially
Parameters
----------
Node : Node
Defines the distance measure as a type of node.
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Initialize distance metric
Parameters
----------
ref_spectra : list, optional
List of reference spectra to compare against, by default []
"""
super().__init__()
# Assign any reference spectra, if they exist
self.ref_spectra = self.spectra_to_array(ref_spectra)
[docs]
@staticmethod
def spectra_to_array(ref_spectra: np.ndarray | list) -> np.ndarray:
"""Convert list of spectra to a numpy array
Parameters
----------
ref_spectra : np.ndarray | list
Object of reference spectra
Returns
-------
np.ndarray
Spectra stored in singular, indexable, sequential array.
"""
if type(ref_spectra) == list:
# Cast to a numpy
ref_spectra = np.array(ref_spectra)
if ref_spectra.ndim == 1:
ref_spectra = np.reshape(
ref_spectra, (1, ref_spectra.shape[-1]))
if ref_spectra.shape == 1:
# Squeeze an extra dimension
ref_spectra = np.expand_dims(ref_spectra, axis=0)
return ref_spectra
[docs]
def fit(self, X):
self.initialized = True
pass
[docs]
def forward(self, X: np.ndarray, ref_spectra: list = None) -> np.ndarray:
"""Pass the data through comparative function
Parameters
----------
X : np.ndarray
Input data.
ref_spectra : list, optional
List of spectra to compare against
Returns
-------
np.ndarray
Distance maps for each of the reference spectra.
Raises
------
ValueError
Mismatch in input data and reference spectra provided on function call.
ValueError
Mismatch in input data and reference spectra provided on node initialization.
ValueError
No reference spectra provided in init or on forward function pass.
"""
ref = self.ref_spectra if ref_spectra is None else ref_spectra
# Default behavior is to use the ref_spectra passed to the function
if len(ref) > 0:
if X.shape[-1] != ref.shape[-1]:
raise ValueError(
'Mismatch in input data and reference spectra!')
# Process and return
res = self.score(flatten_spatial(X), ref)
return res.reshape(*X.shape[:-1], ref.shape[0])
else:
raise ValueError('No reference spectra provided!')
[docs]
def serialize(self, working_dir: str) -> str:
"""Convert distance node to serializable format
Parameters
----------
working_dir : str
Directory where node metadata should be saved.
Returns
-------
str
YAML parameterization of node.
"""
data = deepcopy(self.__dict__)
data['type'] = type(self).__name__
data['ref_spectra'] = data['ref_spectra'].tolist()
# Dump to a string
return yaml.dump(data, default_flow_style=False)
[docs]
def load(self, params: dict, filepath: str = None):
"""Load dumped parameters to recreate the distance object
Parameters
----------
params : dict
Dictionary containing node values
filepath : str, optional
Directory containing node metadata, by default None
"""
# Delete the type param
del params['type']
self.__dict__ = params
# Cast reference spectra back to numpy type
self.ref_spectra = np.array(self.ref_spectra)
[docs]
@abstractmethod
def score(self, data: np.ndarray, X: np.ndarray):
"""Abstract distance method implemented by every distance type
Parameters
----------
data : np.ndarray
Current data to compare.
X : np.ndarray
Reference to compare data against.
"""
pass
@Node.input_dim.getter
def input_dim(self) -> list:
"""Get required input dimension
Returns
-------
list
List defining which input dimensions should be checked in graph.
"""
# Note: This denotes by default we don't care about the input image shape
# so long as the wavelengths match
if len(self.ref_spectra) > 0:
return [-1, -1, self.ref_spectra.shape[1]]
else:
return [-1, -1, -1]
@Node.output_dim.getter
def output_dim(self):
"""Get required output dimension
Returns
-------
list
List defining which input dimensions should be checked in graph.
"""
# Note: Output can be arbitrary length depending on the reference spectra
return [-1, -1, 1]
[docs]
class SpectralAngle(AbstractDistance):
"""Cosine distance between spectra according to the Spectral Angle Mapper (SAM) formula.
Nota Bene: Measurements should be normalized as large values skews this calculation towards π/2.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Construct SAM
Parameters
----------
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
[docs]
@staticmethod
def score(data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores.
"""
# Throw a warning for a large number of unnormalized values
if np.percentile(data, 90) > 2.0:
# 10% of the data exceeds 200%
warnings.warn(
"Spectral angle mapper is being used without properly normalized data. Unexpected behavior may occur!")
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append(np.arccos(
np.dot(data, ref_spectra[idx, :]) / (np.linalg.norm(data,
axis=-1) * np.linalg.norm(ref_spectra[idx]))
))
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class Euclidean(AbstractDistance):
"""Calculate L2 (Euclidean) Distance.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type.
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Construct Euclidean distance node.
Parameters
----------
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
[docs]
@staticmethod
def score(data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against
Returns
-------
np.ndarray
Distance scores.
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
delta = data - ref_spectra[idx, :]
score = np.sqrt(np.sum(delta**2, axis=-1))
output_scores.append(score)
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class Manhattan(AbstractDistance):
"""Calculate L1 (Manhattan) Distance.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type.
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Construct Manhattan distance node.
Parameters
----------
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
[docs]
@staticmethod
def score(data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append(np.sum(
np.abs(data - ref_spectra[idx, :]), axis=-1))
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class Canberra(AbstractDistance):
"""Calculate Weighted L1 (Canberra) Distance.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Construct Canberra distance node.
Parameters
----------
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
[docs]
@staticmethod
def score(data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append(np.sum(
np.abs(data - ref_spectra[idx, :]) /
(np.abs(data) + np.abs(ref_spectra[idx, :])),
axis=-1))
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class Minkowski(AbstractDistance):
"""N-th degree normed vector space (Minkowski) Distance.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type
"""
[docs]
def __init__(self, degree: int, ref_spectra: list = []):
"""Construct Minkowski distance node.
Parameters
----------
degree : int
Order of Minkowski distance
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
self.degree = degree
[docs]
def score(self, data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append((np.sum(
(data - ref_spectra[idx, :])**self.degree, axis=-1))**(1.0/float(self.degree)))
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class GFC(AbstractDistance):
"""Goodness-of-fit Coefficient (GFC)
Citation:
Hernández-Andrés, J., Romero, J., García-Beltrán, A., & Nieves, J. L. (1998). Testing linear models on spectral daylight measurements. Applied Optics, 37(6), 971-977.
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type
"""
[docs]
def __init__(self, ref_spectra: list = []):
"""Construct GFC distance node.
Parameters
----------
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
[docs]
def score(self, data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append(1.0 - (
np.dot(data, ref_spectra[idx, :]) / (np.linalg.norm(data,
axis=-1) * np.linalg.norm(ref_spectra[idx]))
))
output_scores = np.stack(output_scores, axis=-1)
return output_scores
[docs]
class ECS(AbstractDistance):
"""Euclidean Distance of Cumulative Spectrum (ECS)
Parameters
----------
AbstractDistance : AbstractDistance
Defines the node as AbstractDistance node type
"""
[docs]
def __init__(self, wavelengths: np.ndarray | list, ref_spectra: list = []):
"""Initialize an ECS distance node.
Parameters
----------
wavelengths : np.ndarray | list
Array defining positioning of wavelength channels (typically given in nm).
This length of this vector must equal the number of channels in inputted datacubes.
ref_spectra : list, optional
Reference spectra to compare against, by default []
"""
super().__init__(ref_spectra)
# Cast this to a list, necessary for serialization
self.wavelengths = list(wavelengths)
@Node.input_dim.getter
def input_dim(self) -> list:
"""Return the required input dimension
Returns
-------
list
Required input shape, which can vary in datacube height and width, but must be of consistent channel size.
"""
# Note: this function depends on the wavelengths, so we need to match the dimension
return [-1, -1, len(self.wavelengths)]
[docs]
def score(self, data: np.ndarray, ref_spectra: np.ndarray) -> np.ndarray:
"""Score new datacubes against reference spectra.
Parameters
----------
data : np.ndarray
Input data.
ref_spectra : np.ndarray
Reference spectra to compare against.
Returns
-------
np.ndarray
Distance scores
"""
output_scores = []
for idx in range(ref_spectra.shape[0]):
# Calculate the distances
output_scores.append(np.sqrt((np.trapz(
data, self.wavelengths, axis=-1) - np.trapz(ref_spectra[idx, :], self.wavelengths))**2))
output_scores = np.stack(output_scores, axis=-1)
return output_scores