# Copyright (c) 2020 CN Group, TU Wien
# Released under the GNU Lesser General Public License version 3,
# see accompanying file LICENSE or <https://www.gnu.org/licenses/>.
"""
Indexing structures for fast stream data processing.
"""
import numpy as np
import math
import multiprocessing as mp
from dSalmon import swig as dSalmon_cpp
from dSalmon.util import sanitizeData, sanitizeTimes, lookupDistance
[docs]class StatisticsTree(object):
"""
Indexing structure for computing per-dimension statistics in a sliding
window.
This implementation relies on an order statistic tree provided by Boost
for achieving O(log(window)) time complexity for quantile computation.
Parameters
----------
window: float
Window length after which samples will be pruned.
what: list of strings, optional
Which statistics to compute. Elements of `what` can be one of
'sum', 'average', 'squares_sum', 'variance', 'min', 'max'
or 'median'.
quantiles: list of floats, optional
Quantile values to compute in addition to statistics in `what`.
Elements should be floats in [0,1].
float_type: np.float32 or np.float64
The floating point type to use for internal processing.
"""
def __init__(self, window, what=[], quantiles=[], float_type=np.float64):
self.window = window
self.float_type = float_type
cpp_obj = {np.float32: dSalmon_cpp.StatisticsTree32, np.float64: dSalmon_cpp.StatisticsTree64}[float_type]
self.cpp_tree = cpp_obj(window)
if isinstance(what, str):
what = [ what ]
self.stats = what
stat_mapping = {
'sum': self.cpp_tree.STAT_SUM,
'average': self.cpp_tree.STAT_AVERAGE,
'squares_sum': self.cpp_tree.STAT_SQUARES_SUM,
'variance': self.cpp_tree.STAT_VARIANCE,
'min': self.cpp_tree.STAT_MIN,
'max': self.cpp_tree.STAT_MAX,
'median': self.cpp_tree.STAT_MEDIAN,
}
self.mapped_stats = [ stat_mapping[x] for x in what ]
self.quantiles = np.array(quantiles, dtype=self.float_type)
assert (0 <= self.quantiles).all() and (self.quantiles <= 1).all()
self.last_time = 0
[docs] def fit_query(self, X, times=None):
"""
Process next chunk of data.
Parameters
----------
X: ndarray, shape (n_samples, n_features)
The input data.
times: ndarray, shape (n_samples,), optional
Timestamps for input data. If None, timestamps are linearly
increased for each sample.
Returns
-------
statistics: ndarray, shape (n_samples, n_statistics, n_features)
The computed statistics. Statistics for row `i` are evaluated
after adding row `i` to the sliding window.
Here, `n_statistics` = `len(what)` + `len(quantiles)`.
counts: ndarray, shape (n_samples)
The lengths of the sliding window after processing each row
of `X`.
"""
X = sanitizeData(X, self.float_type)
times = sanitizeTimes(times, X.shape[0], self.last_time, self.float_type)
result = np.empty((X.shape[0], len(self.mapped_stats) + len(self.quantiles), X.shape[1]), dtype=self.float_type)
counts = np.empty(X.shape[0], dtype=np.int64)
self.cpp_tree.fit_query(X, times, self.mapped_stats, self.quantiles, result, counts)
self.last_time = times[-1]
return result, counts
class _MTreeByIndexLookup(object):
def __init__(self, tree_obj):
self.tree_obj = tree_obj
def __len__(self):
return len(self.tree_obj)
def __getitem__(self, indices):
sel, result_length = self.tree_obj._byIndexSelector(indices)
# TODO: should do something if both sel and tree are empty
points = np.empty((result_length, self.tree_obj.dimension), dtype=self.tree_obj.float_type)
self.tree_obj.cpp_tree.getPoints(sel, points)
return points
def __setitem__(self, indices, data):
data = sanitizeData(data, self.tree_obj.float_type)
assert self.tree_obj.dimension == data.shape[1]
if isinstance(indices, slice):
start,stop,stride = indices.indices(len(self))
assert (stop-start)//stride == data.shape[0], "Slice doesn't match assigned size"
self.tree_obj.cpp_tree.updateByIndexSlice(start, stop, stride, data)
else:
indices = np.array(indices, dtype=np.int64)
if len(indices.shape) == 0:
indices = indices[None]
assert len(indices.shape) == 1
indices[indices<0] += len(self)
assert (0<=indices).all() and (indices<len(self)).all()
assert indices.shape[0] == data.shape[0], "Slice doesn't match assigned size"
self.tree_obj.cpp_tree.updateByIndex(indices, data)
def __delitem__(self, indices):
sel,_ = self.tree_obj._byIndexSelector(indices)
self.tree_obj.cpp_tree.remove(sel)
[docs]class MTree(object):
"""
M-Tree efficient nearest-neighbor search in metric spaces.
A point within a tree can be accessed either via tree[k] using the
point's key k, or via tree.ix[i] using the point's index i.
Keys can be arbitrary integers and are returned by the insert, knn and
neighbors functions. Indices are integers in the range 0...len(tree), sorted
according to the points' keys in ascending order.
Parameters
----------
metric: string
Which distance metric to use. Currently supported metrics
include 'chebyshev', 'cityblock', 'euclidean' and
'minkowsi'.
metric_params: dict
Parameters passed to the metric. Minkowsi distance requires
setting an integer `p` parameter.
float_type: np.float32 or np.float64
Which floating point type to use for internal processing.
min_node_size: int
The minimum number of children in tree nodes. Different
parametrizations for min_node_size are guaranteed to
return identical results.
max_node_size: int
The maximum number of children in tree nodes. Different
parametrizations for max_node_size are guaranteed to
return identical results.
split_sampling: int
The number of combinations to try when splitting a node.
Different parametrizations for split_sampling are guaranteed
to return identical results.
insert_jobs: int
The number of additional threads to spawn for tree insertions.
Since insertions can only partially be parallelized, using
too many threads can harm performance.
query_jobs: int
The number of threads to use for range- and knn-queries.
"""
def __init__(self, metric='euclidean', metric_params=None, float_type=np.float64, min_node_size=5, max_node_size=100, split_sampling=20, insert_jobs=2, query_jobs=-1, **kwargs):
assert float_type in [np.float32, np.float64]
assert min_node_size * 2 < max_node_size
self.float_type = float_type
distance_function = lookupDistance(metric, float_type, **(metric_params or {}))
insert_jobs = mp.cpu_count() if insert_jobs==-1 else insert_jobs
query_jobs = mp.cpu_count() if query_jobs==-1 else query_jobs
cpp_obj = {np.float32: dSalmon_cpp.MTree32, np.float64: dSalmon_cpp.MTree64}[float_type]
self.cpp_tree = cpp_obj(distance_function, int(min_node_size), int(max_node_size), int(split_sampling), int(insert_jobs), int(query_jobs))
self.ix = _MTreeByIndexLookup(self)
self.float_type = float_type
if float_type == np.float32:
self.selector_factory = dSalmon_cpp.MTreeSelector32
self.range_query_factory = dSalmon_cpp.MTreeRangeQuery32
self.knn_query_factory = dSalmon_cpp.MTreeKnnQuery32
else:
self.selector_factory = dSalmon_cpp.MTreeSelector64
self.range_query_factory = dSalmon_cpp.MTreeRangeQuery64
self.knn_query_factory = dSalmon_cpp.MTreeKnnQuery64
def _byKeySelector(self, keys, ignore_missing=False):
assert not isinstance(keys, slice), 'Slice access is not supported for this object. Use .ix[] instead.'
keys = np.array(keys, dtype=np.int64)
if len(keys.shape) == 0:
keys = keys[None]
assert len(keys.shape) == 1
sel = self.selector_factory(self.cpp_tree, keys, False)
if not ignore_missing and not sel.allOk():
found = np.empty(keys.shape[0], dtype=np.uint8)
sel.getFoundMask(found)
raise IndexError('Keys ' + str(keys[found==0].tolist()) + ' invalid')
return sel, len(keys)
def _byIndexSelector(self, indices, ignore_missing=False):
if isinstance(indices, slice):
start,stop,stride = indices.indices(len(self))
result_length = (stop-start)//stride
sel = self.selector_factory(self.cpp_tree, start, stop, stride)
assert sel.allOk()
else:
indices = np.array(indices, dtype=np.int64)
if len(indices.shape) == 0:
indices = indices[None]
assert len(indices.shape) == 1
indices[indices<0] += len(self)
assert (0<=indices).all() and (indices<len(self)).all()
result_length = len(indices)
sel = self.selector_factory(self.cpp_tree, indices, True)
if not ignore_missing and not sel.allOk():
found = np.empty(len(indices), dtype=np.uint8)
sel.getFoundMask(found)
raise IndexError('Indices ' + str(indices[found==0].tolist()) + ' invalid')
return sel, result_length
[docs] def insert(self, data):
"""
Insert points and return indices.
Parameters
----------
data: ndarray, shape (n_samples, n_features)
The data to be inserted.
Returns
-------
indices: ndarray, shape (n_samples,)
The indices assigned to the newly inserted data points.
"""
data = sanitizeData(data, self.float_type)
assert self.dimension in [-1, data.shape[1]]
indices = np.empty(data.shape[0], dtype=np.int64)
self.cpp_tree.insert(data, indices)
return indices
[docs] def remove(self, keys):
"""Remove points identified by keys, skipping non-existent entries.
Parameters
---------------
keys: ndarray, shape (n_samples,)
Indices of the data points to be removed.
Returns
---------------
found: ndarray, shape (n_samples,)
Boolean array indicating whether the removal was successful.
"""
sel, result_length = self._byKeySelector(keys, ignore_missing=True)
if self.dimension == -1:
return np.zeros(len(keys), dtype=bool)
found = np.empty(len(keys), dtype=np.uint8)
self.cpp_tree.remove(sel)
sel.getFoundMask(found)
return found != 0
[docs] def neighbors(self, data, radius):
"""
Return all points within a given radius.
Parameters
---------------
data: ndarray, shape (n_samples, n_features)
Points for which the range query should be performed.
radius: double
Radius for the search.
Returns
---------------
keys: ndarray, shape (n_total_neighbors,)
Concatenation of keys of neighbors within radius.
distances: ndarray, shape (n_total_neighbors,)
Concatenation of distances of neighbors to the respective
query points.
lengths: ndarray, shape (n_samples,)
The number of neighbors returned for each point, so that
sum(length) == n_total_neighbors.
"""
data = sanitizeData(data, self.float_type)
assert self.dimension == data.shape[1]
query = self.range_query_factory(self.cpp_tree, data, radius)
total_length = query.resultTotalLength()
keys = np.empty(total_length, dtype=np.int64)
distances = np.empty(total_length, dtype=self.float_type)
lengths = np.empty(data.shape[0], dtype=np.int32)
query.result(keys, distances)
query.resultLengths(lengths)
return keys, distances, lengths
[docs] def knn(self, data, k=1, sort=True, min_radius=0, max_radius=math.inf, reverse=False, extend_for_ties=False):
"""
Find the k nearest neighbors of points.
Parameters
---------------
data: ndarray, shape (n_samples, n_features)
Points for which to perform a knn query.
k: int
Number of nearest neighbors to consider.
sort: bool
Whether the returned points should be sorted by distance.
min_radius: double
Minimum distance for returned neighbor points.
max_radius: double
Maximum distance for returned neighbor points.
reverse: bool
If reverse == True, return the k most distant points instead
of the k nearest neighbors.
extend_for_ties: bool
Whether in the case of ties more than k points should
be returned.
Returns
---------------
keys: ndarray, shape (n_total_neighbors,)
Concatenation of keys of found neighbors.
distances: ndarray, shape (n_total_neighbors,)
Concatenation of distances of neighbors to the respective
query points.
lengths: ndarray, shape (n_samples,)
The number of neighbors returned for each point, so that
sum(length) == n_total_neighbors.
"""
data = sanitizeData(data, self.float_type)
assert self.dimension == data.shape[1]
query = self.knn_query_factory(self.cpp_tree, data, int(k), sort,
min_radius, max_radius, reverse,
extend_for_ties)
total_length = query.resultTotalLength()
keys = np.empty(total_length, dtype=np.int64)
distances = np.empty(total_length, dtype=self.float_type)
lengths = np.empty(data.shape[0], dtype=np.int32)
query.result(keys, distances)
query.resultLengths(lengths)
return keys, distances, lengths
[docs] def clear(self):
"""Remove all points from the tree."""
self.cpp_tree.clear()
def __len__(self):
return self.cpp_tree.size()
@property
def dimension(self):
return self.cpp_tree.dataDimension()
[docs] def get_points(self, keys):
"""
Retrieve points by key, skipping non-existent entries.
Parameters
---------------
keys: ndarray, shape (n_samples,)
Keys for points to query as returned by insert().
Returns
---------------
points: ndarray, shape (n_samples, n_features)
Coordinates of queried points or all-zero vectors if
points were not found in the tree.
found: ndarray, shape (n_samples,)
Whether the respective keys were found in the tree.
"""
sel, result_length = self._byKeySelector(keys, ignore_missing=True)
if self.dimension == -1:
return np.zeros((0,0), dtype=self.float_type), np.zeros(len(keys), dtype=bool)
points = np.empty((len(keys), self.dimension), dtype=self.float_type)
found = np.empty(len(keys), dtype=np.uint8)
self.cpp_tree.getPoints(sel, points)
sel.getFoundMask(found)
return points, found!=0
def __getitem__(self, keys):
sel, result_length = self._byKeySelector(keys)
# TODO: should handle dimension if both key and tree are empty
points = np.empty((result_length, self.dimension), dtype=self.float_type)
self.cpp_tree.getPoints(sel, points)
return points
def __setitem__(self, keys, data):
assert not isinstance(keys, slice), 'Slice access is not supported for this object. Use .ix[] instead.'
data = sanitizeData(data, self.float_type)
keys = np.array(keys, dtype=np.int64)
if len(keys.shape) == 0:
keys = keys[None]
assert len(keys.shape) == 1
assert self.dimension in [-1, data.shape[1]]
assert len(keys) == data.shape[0]
self.cpp_tree.update(keys, data)
def __delitem__(self, keys):
sel,_ = self._byKeySelector(keys)
self.cpp_tree.remove(sel)
[docs] def itok(self, indices=None):
"""
Map indices to keys.
Parameters
----------
indices: ndarray or slice, optional
Indices or slice for which to return keys. If None,
all keys are returned.
Returns
-------
keys: ndarray
The requested keys as numpy array.
"""
if indices is None:
indices = slice(None)
sel, result_length = self._byIndexSelector(indices)
keys = np.empty(result_length, dtype=np.int64)
self.cpp_tree.keys(sel, keys)
return keys
[docs] def ktoi(self, keys):
"""
Map keys to indices.
Parameters
----------
keys: ndarray
Keys for which to return indices.
Returns
-------
indices: ndarray
The requested indices as numpy array.
"""
sel, result_length = self._byKeySelector(keys)
indices = np.empty(result_length, dtype=np.int64)
self.cpp_tree.indices(sel, indices)
return indices
[docs] def copy(self):
"""Return a copy of the tree."""
new_tree = MTree(float_type=self.float_type)
new_tree.cpp_tree.unserialize(self.cpp_tree.serialize())
return new_tree
def __getstate__(self):
return {
'version': 0,
'float_type': self.float_type,
'data': self.cpp_tree.serialize()
}
def __setstate__(self, state):
assert state['version'] == 0, 'Unknown serialization version'
self.__init__(float_type=state['float_type'])
# TODO: is distance function properly serialized?
if not self.cpp_tree.unserialize(state['data']):
raise ValueError('Unsupported serialization version')