Source code for dSalmon.outlier

# Copyright (c) 2020 CN Group, TU Wien
# Released under the GNU Lesser General Public License version 3,
# see accompanying file LICENSE or <https://www.gnu.org/licenses/>.

"""
Streaming outlier detection models.
"""

import numpy as np
import multiprocessing as mp

from dSalmon import swig as dSalmon_cpp
from dSalmon import projection
from dSalmon.util import sanitizeData, sanitizeTimes, lookupDistance

[docs]class OutlierDetector(object): """ Base class for outlier detectors. """ def _init_model(self, p): pass
[docs] def get_params(self, deep=True): """ Return the used algorithm parameters as dictionary. Parameters ---------- deep: bool, default=True Ignored. Only for compatibility with scikit-learn. Returns ------- params: dict Dictionary of parameters. """ return self.params
[docs] def set_params(self, **params): """ Reset the model and set the parameters in accordance to the supplied dictionary. Parameters ---------- **params: dict Dictionary of parameters. """ p = self.params.copy() for key in params: assert key in p, 'Unknown parameter: %s' % key p[key] = params[key] self._init_model(p)
[docs] def fit(self, X, times=None): """ Process next chunk of data without returning outlier scores. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. """ # In most cases, fitting isn't any faster than additionally # performing outlier scoring. We override this method only # when it yields faster processing. self.fit_predict(X, times)
def _process_data(self, data): data = sanitizeData(data, self.params['float_type']) assert self.dimension == -1 or data.shape[1] == self.dimension self.dimension = data.shape[1] return data def _process_times(self, data, times): times = sanitizeTimes(times, data.shape[0], self.last_time, self.params['float_type']) self.last_time = times[-1] return times
[docs]class SWDBOR(OutlierDetector): """ Distance based outlier detection by radius. When setting a threshold for the returned outlier scores to tranform outlier scores into binary labels, results coincide with ExactStorm :cite:p:`Angiulli2007`, AbstractC :cite:p:`Yang2009` or the COD family :cite:p:`Kontaki2011`. Parameters ---------- window: float Window length after which samples will be pruned. radius: float Radius for classification as neighbor. metric: string Which distance metric to use. Currently supported metrics include 'chebyshev', 'cityblock', 'euclidean' and 'minkowsi'. metric_params: dict Parameters passed to the metric. Minkowsi distance requires setting an integer `p` parameter. float_type: np.float32 or np.float64 The floating point type to use for internal processing. min_node_size: int, optional (default=5) Smallest possible size for M-Tree nodes. min_node_size is guaranteed to leave results unaffected. max_node_size: int, optional (default=20) Largest possible size for M-Tree nodes. max_node_size is guaranteed to leave results unaffected. split_sampling: int, optional (default=5) The number of key combinations to try when splitting M-Tree routing nodes. split_sampling is guaranteed to leave results unaffected. """ def __init__(self, window, radius, metric='euclidean', metric_params=None, float_type=np.float64, min_node_size=5, max_node_size=100, split_sampling=20): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['max_node_size'] > 2 * p['min_node_size'], 'max_node_size must be > 2 * min_node_size' assert p['min_node_size'] > 0 assert p['window'] > 0 assert p['radius'] > 0 distance_function = lookupDistance(p['metric'], p['float_type'], **(p['metric_params'] or {})) cpp_obj = {np.float32: dSalmon_cpp.DBOR32, np.float64: dSalmon_cpp.DBOR64}[p['float_type']] self.model = cpp_obj(p['window'], p['radius'], distance_function, p['min_node_size'], p['max_node_size'], p['split_sampling']) self.last_time = 0 self.dimension = -1 self.params = p
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. neighbors: ndarray, shape (n_samples) Number of neighbors of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=np.int32) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) neighbors = np.empty(window_size, dtype=np.int32) self.model.get_window(data, times, neighbors) return data, times, neighbors
[docs]class SWKNN(OutlierDetector): """ Distance based outlier detection by k nearest neighbors. When setting a threshold for the returned outlier scores to tranform outlier scores into binary labels, results coincide with ExactStorm :cite:p:`Angiulli2007`, AbstractC :cite:p:`Yang2009` or the COD family :cite:p:`Kontaki2011`. Parameters ---------- window: float Window length after which samples will be pruned. k: int Number of nearest neighbors to consider for outlier scoring. k_is_max: bool (default=False) Whether scores should be returned for all neighbor values up to the provided k. Grid search for the optimal k can be performed by setting k_is_max=True. metric: string Which distance metric to use. Currently supported metrics include 'chebyshev', 'cityblock', 'euclidean' and 'minkowsi'. metric_params: dict Parameters passed to the metric. Minkowsi distance requires setting an integer `p` parameter. float_type: np.float32 or np.float64 The floating point type to use for internal processing. min_node_size: int, optional (default=5) Smallest possible size for M-Tree nodes. min_node_size is guaranteed to leave results unaffected. max_node_size: int, optional (default=20) Largest possible size for M-Tree nodes. max_node_size is guaranteed to leave results unaffected. split_sampling: int, optional (default=5) The number of key combinations to try when splitting M-Tree routing nodes. split_sampling is guaranteed to leave results unaffected. """ def __init__(self, window, k, k_is_max=False, metric='euclidean', metric_params=None, float_type=np.float64, min_node_size = 5, max_node_size = 100, split_sampling = 20): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['max_node_size'] > 2 * p['min_node_size'], 'max_node_size must be > 2 * min_node_size' assert p['min_node_size'] > 0 assert p['window'] > 0 assert p['k'] > 0 distance_function = lookupDistance(p['metric'], p['float_type'], **(p['metric_params'] or {})) cpp_obj = {np.float32: dSalmon_cpp.SWKNN32, np.float64: dSalmon_cpp.SWKNN64}[p['float_type']] self.model = cpp_obj(p['window'], p['k'], distance_function, p['min_node_size'], p['max_node_size'], p['split_sampling']) self.last_time = 0 self.dimension = -1
[docs] def fit(self, X, times=None): """ Process next chunk of data without returning outlier scores. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. """ X = self._process_data(X) times = self._process_times(X, times) self.model.fit(X, times)
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) or (n_samples,k) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) if self.params['k_is_max']: scores = np.empty([X.shape[0], self.params['k']], dtype=self.params['float_type']) self.model.fit_predict_with_neighbors(X, scores, times) else: scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=np.int32) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) self.model.get_window(data, times) return data, times
[docs]class SWLOF(OutlierDetector): """ Local Outlier Factor :cite:p:`Breunig2000` within a sliding window. Parameters ---------- window: float Window length after which samples will be pruned. k: int Number of nearest neighbors to consider for outlier scoring. simplified: bool (default=False) Whether to use simplified LOF. k_is_max: bool (default=False) Whether scores should be returned for all neighbor values up to the provided k. Grid search for the optimal k can be performed by setting k_is_max=True. metric: string Which distance metric to use. Currently supported metrics include 'chebyshev', 'cityblock', 'euclidean' and 'minkowsi'. metric_params: dict Parameters passed to the metric. Minkowsi distance requires setting an integer `p` parameter. float_type: np.float32 or np.float64 The floating point type to use for internal processing. min_node_size: int, optional (default=5) Smallest possible size for M-Tree nodes. min_node_size is guaranteed to leave results unaffected. max_node_size: int, optional (default=20) Largest possible size for M-Tree nodes. max_node_size is guaranteed to leave results unaffected. split_sampling: int, optional (default=5) The number of key combinations to try when splitting M-Tree routing nodes. split_sampling is guaranteed to leave results unaffected. """ def __init__(self, window, k, simplified=False, k_is_max=False, metric='euclidean', metric_params=None, float_type=np.float64, min_node_size=5, max_node_size=100, split_sampling=20): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['max_node_size'] > 2 * p['min_node_size'], 'max_node_size must be > 2 * min_node_size' assert p['min_node_size'] > 0 assert p['window'] > 0 assert p['k'] > 0 distance_function = lookupDistance(p['metric'], p['float_type'], **(p['metric_params'] or {})) cpp_obj = {np.float32: dSalmon_cpp.SWLOF32, np.float64: dSalmon_cpp.SWLOF64}[p['float_type']] self.model = cpp_obj(p['window'], p['k'], p['simplified'], distance_function, p['min_node_size'], p['max_node_size'], p['split_sampling']) self.last_time = 0 self.dimension = -1
[docs] def fit(self, data, times=None): """ Process next chunk of data without returning outlier scores. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. """ data = self._process_data(data) times = self._process_times(data, times) self.model.fit(data, times)
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) or (n_samples,k) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty([X.shape[0], self.params['k']], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores if self.params['k_is_max'] else scores[:,-1]
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=np.int32) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) self.model.get_window(data, times) return data, times
[docs]class SDOstream(OutlierDetector): """ Streaming outlier detection based on Sparse Data Observers :cite:p:`Hartl2019`. Parameters ---------- k: int Number of observers to use. T: int Characteristic time for the model. Increasing T makes the model adjust slower, decreasing T makes it adjust quicker. qv: float, optional (default=0.3) Ratio of unused observers due to model cleaning. x: int (default=6) Number of nearest observers to consider for outlier scoring and model cleaning. metric: string Which distance metric to use. Currently supported metrics include 'chebyshev', 'cityblock', 'euclidean' and 'minkowsi'. metric_params: dict Parameters passed to the metric. Minkowsi distance requires setting an integer `p` parameter. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int (default=0) Random seed to use. return_sampling: bool (default=False) Also return whether a data point was adopted as observer. """ def __init__(self, k, T, qv=0.3, x=6, metric='euclidean', metric_params=None, float_type=np.float64, seed=0, return_sampling=False): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert 0 <= p['qv'] < 1, 'qv must be in [0,1)' assert p['x'] > 0, 'x must be > 0' assert p['k'] > 0, 'k must be > 0' assert p['T'] > 0, 'T must be > 0' distance_function = lookupDistance(p['metric'], p['float_type'], **(p['metric_params'] or {})) cpp_obj = {np.float32: dSalmon_cpp.SDOstream32, np.float64: dSalmon_cpp.SDOstream64}[p['float_type']] self.model = cpp_obj(p['k'], p['T'], p['qv'], p['x'], distance_function, p['seed']) self.last_time = 0 self.dimension = -1
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) if self.params['return_sampling']: sampling = np.empty(X.shape[0], dtype=np.int32) self.model.fit_predict_with_sampling(X, scores, times, sampling) return scores, sampling else: self.model.fit_predict(X, scores, times) return scores
[docs] def observer_count(self): """Return the current number of observers.""" return self.model.observer_count()
[docs] def get_observers(self, time=None): """ Return observer data. Returns ------- data: ndarray, shape (n_observers, n_features) Sample used as observer. observations: ndarray, shape (n_observers,) Exponential moving average of observations. av_observations: ndarray, shape (n_observers,) Exponential moving average of observations normalized according to the theoretical maximum. """ if time is None: time = self.last_time observer_cnt = self.model.observer_count() if observer_cnt == 0: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']) data = np.empty([observer_cnt, self.dimension], dtype=self.params['float_type']) observations = np.empty(observer_cnt, dtype=self.params['float_type']) av_observations = np.empty(observer_cnt, dtype=self.params['float_type']) self.model.get_observers(data, observations, av_observations, self.params['float_type'](time)) return data, observations, av_observations
[docs]class SWRRCT(OutlierDetector): """ Robust Random Cut Forest :cite:p:`Guha16`. Parameters ---------- window: float Window length after which samples will be pruned. n_estimators: int Number of trees in the ensemble. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int Random seed for tree construction. n_jobs: int Number of threads to use for processing trees. Pass -1 to use as many jobs as there are CPU cores. """ def __init__(self, window, n_estimators = 10, float_type=np.float64, seed=0, n_jobs=-1): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['n_estimators'] > 0 assert p['window'] > 0 cpp_obj = {np.float32: dSalmon_cpp.RRCT32, np.float64: dSalmon_cpp.RRCT64}[p['float_type']] self.model = cpp_obj(p['n_estimators'], p['window'], p['seed'], mp.cpu_count() if p['n_jobs']==-1 else p['n_jobs']) self.last_time = 0 self.dimension = -1
[docs] def fit(self, X, times=None): """ Process next chunk of data without returning outlier scores. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. """ X = self._process_data(X) times = self._process_times(X, times) self.model.fit(X, times)
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) self.model.get_window(data, times) return data, times
[docs]class RSHash(OutlierDetector): """ RS-Hash :cite:p:`Sathe2016`. This outlier detector assumes that features are normalized to a [0,1] range. Parameters ---------- n_estimators: int Number of estimators in the ensemble. window: float Window length after which samples will be pruned. cms_w: int Number of hash functions per estimator for the count-min sketch. cms_d: int Number of bins for the count-min sketch. s_param: int, optional The s parameter of RS-Hash, which should be an estimate of the number of samples in a sliding window. If None, the value of window will be used for s_param, assuming that samples arrive with an inter-arrival time of 1. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int Random seed to use. n_jobs: int Number of threads to use for processing trees. Pass -1 to use as many jobs as there are CPU cores. """ def __init__(self, n_estimators, window, cms_w, cms_d, s_param=None, float_type=np.float64, seed=0, n_jobs=-1): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['n_estimators'] > 0 assert p['window'] > 0 cpp_obj = {np.float32: dSalmon_cpp.RSHash32, np.float64: dSalmon_cpp.RSHash64}[p['float_type']] self.model = cpp_obj(p['n_estimators'], p['window'], p['cms_w'], p['cms_d'], p['s_param'] or p['window'], p['seed'], mp.cpu_count() if p['n_jobs']==-1 else p['n_jobs']) self.last_time = 0 self.dimension = -1
[docs] def fit_predict(self, X, times=None): """ Process next chunk of data. Data in X is assumed to be normalized to [0,1]. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) self.model.get_window(data, times) return data, times
[docs]class LODA(OutlierDetector): """ LODA :cite:p:`Pevny2016`. This detector performs outlier detection based on equi-depth histograms. If random projections are used, this corresponds to the LODA algorithm, otherwise behaviour corresponds to a sliding window adaptation of the HBOS :cite:p:`Goldstein2012` algorithm. Parameters ---------- window: float Window length after which samples will be pruned. n_projections: int, optional The number of random projections to use. If None, random projections are skipped. n_bins: int The number of histogram bins. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int Seed for random projections. n_jobs: int Number of threads to use for processing trees. Pass -1 to use as many jobs as there are CPU cores. """ def __init__(self, window, n_projections=None, n_bins=10, float_type=np.float64, seed=0, n_jobs=-1): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _perform_projections(self, X): if self.projector is not None: return self.projector.transform(X) return X def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['n_bins'] > 0 assert p['window'] > 0 assert p['n_projections'] is None or p['n_projections'] > 0 cpp_obj = {np.float32: dSalmon_cpp.SWHBOS32, np.float64: dSalmon_cpp.SWHBOS64}[p['float_type']] self.model = cpp_obj(p['window'], p['n_bins'], mp.cpu_count() if p['n_jobs']==-1 else p['n_jobs']) if p['n_projections'] is not None: self.projector = projection.LODAProjector(p['n_projections'], float_type=p['float_type'], seed=p['seed']) else: self.projector = None self.last_time = 0 self.dimension = -1
[docs] def fit(self, X, times=None): """ Process next chunk of data without returning outlier scores. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. """ X = self._perform_projections(self._process_data(X)) times = self._process_times(X, times) self.model.fit(X, times)
[docs] def fit_predict(self, X, times = None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._perform_projections(self._process_data(X)) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs] def window_size(self): """Return the number of samples in the sliding window.""" return self.model.window_size()
[docs] def get_window(self): """ Return samples in the current window. Returns ------- data: ndarray, shape (n_samples, n_features) Samples in the current window. If n_projections is set, returns the projected data samples. times: ndarray, shape (n_samples,) Expiry times of samples in the current window. """ if self.dimension == -1: return np.zeros([0], dtype=self.params['float_type']), np.zeros([0], dtype=self.params['float_type']) window_size = self.model.window_size() data = np.empty([window_size, self.dimension], dtype=self.params['float_type']) times = np.empty(window_size, dtype=self.params['float_type']) self.model.get_window(data, times) return data, times
[docs]class HSTrees(OutlierDetector): """ Streaming Half-Space Trees :cite:p:`Tan2011`. Parameters ---------- window: float Window length after which samples will be pruned. n_estimators: int The number of trees in the ensemble. max_depth: int The depth of each individual tree. size_limit: int, optional The maximum size of nodes to consider for outlier scoring. If None, defaults to 0.1*window, as described in the corresponding paper. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int Random seed for tree construction. n_jobs: int Number of threads to use for processing trees. Pass -1 to use as many jobs as there are CPU cores. """ # TODO: size_limit=None is inconsistent when passing times to fit_predict() def __init__(self, window, n_estimators, max_depth, size_limit=None, float_type=np.float64, seed=0, n_jobs=-1): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['n_estimators'] > 0 assert p['max_depth'] > 0 assert p['size_limit'] is None or p['size_limit'] >= 0 cpp_obj = {np.float32: dSalmon_cpp.HSTrees32, np.float64: dSalmon_cpp.HSTrees64}[p['float_type']] self.model = cpp_obj(p['window'], p['n_estimators'], p['max_depth'], p['window']//10 if p['size_limit'] is None else p['size_limit'], p['seed'], mp.cpu_count() if p['n_jobs']==-1 else p['n_jobs']) self.last_time = 0 self.dimension = -1
[docs] def fit_predict(self, X, times = None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. times: ndarray, shape (n_samples,), optional Timestamps for input data. If None, timestamps are linearly increased for each sample. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = self._process_data(X) times = self._process_times(X, times) scores = np.empty(X.shape[0], dtype=self.params['float_type']) self.model.fit_predict(X, scores, times) return scores
[docs]class xStream(OutlierDetector): """ xStream :cite:p:`Manzoor2018`. Parameters ---------- window: int Window length after which the current window will be switch to the reference window. n_estimators: int The number of chains in the ensemble. n_projections: int The number of StreamHash projections to use. depth: int The length of each half-space chain. cms_w: int Number of hash functions for the count-min sketches. cms_d: int Number of bins for the count-min sketches. float_type: np.float32 or np.float64 The floating point type to use for internal processing. seed: int Random seed for tree construction. n_jobs: int Number of threads to use for processing trees. Pass -1 to use as many jobs as there are CPU cores. """ def __init__(self, window, n_estimators, n_projections, depth, cms_w=5, cms_d=1000, float_type=np.float64, seed=0, n_jobs=-1): self.params = { k: v for k, v in locals().items() if k != 'self' } self._init_model(self.params) def _init_model(self, p): assert p['float_type'] in [np.float32, np.float64] assert p['window'] > 0 assert p['depth'] > 0 cpp_obj = {np.float32: dSalmon_cpp.HSChains32, np.float64: dSalmon_cpp.HSChains64}[p['float_type']] self.model = cpp_obj(p['window'], p['n_estimators'], p['depth'], p['cms_w'], p['cms_d'], p['seed'], mp.cpu_count() if p['n_jobs']==-1 else p['n_jobs']) self.projector = projection.StreamHash(n_projections, float_type=p['float_type'], seed=p['seed']) self.initial_sample = np.empty((0, p['n_projections']), dtype=p['float_type']) self.initial_sample_was_set = False
[docs] def set_initial_sample(self, data, features=None): """ Optionally set the initial sample used for estimating the range of projected features. If no initial sample is provided, ranges will be estimated from the first `window` data points. In this case, the first `window` data points are stored to construct the reference window as soon as range estimates are available. Parameters ---------- data: ndarray, shape (n_samples, n_features) The initial sample. features: list, optional Feature names used for StreamHash. The `repr()` of list elements is used as basis for hashing, hence elements do not necessarily have to be strings. If None, `range(n_features)` is used as feature names. """ assert not self.initial_sample_was_set and not self.initial_sample.size, 'The initial sample must be set before processing any data point.' data = sanitizeData(data, self.params['float_type']) data_projected = self.projector.transform(data, features) self.model.set_initial_minmax(np.min(data_projected, axis=0), np.max(data_projected, axis=0)) self.initial_sample_was_set = True self.initial_sample = None
[docs] def fit_predict(self, X, features=None): """ Process next chunk of data. Parameters ---------- X: ndarray, shape (n_samples, n_features) The input data. features: list, optional Feature names used for StreamHash. The `repr()` of list elements is used as basis for hashing, hence elements do not necessarily have to be strings. If None, `range(n_features)` is used as feature names. Returns ------- y: ndarray, shape (n_samples,) Outlier scores for provided input data. """ X = sanitizeData(X, self.params['float_type']) X_projected = self.projector.transform(X, features) returned_scores = np.empty(X.shape[0], dtype=self.params['float_type']) if not self.initial_sample_was_set: window = self.params['window'] threshold = window - len(self.initial_sample) self.initial_sample = np.append(self.initial_sample, X_projected[:threshold], axis=0) returned_scores[:threshold] = np.NaN if len(self.initial_sample) < window: return returned_scores self.initial_sample_was_set = True self.model.set_initial_minmax(np.min(self.initial_sample, axis=0), np.max(self.initial_sample, axis=0)) self.model.fit(self.initial_sample) self.initial_sample = None X_projected = X_projected[threshold:] scores = returned_scores[threshold:] else: scores = returned_scores self.model.fit_predict(X_projected, scores) return returned_scores