# -*- coding: utf-8 -*-
#Created on Tue May 28 10:27:15 2013
"""
Interpolation routines.
.. rubric:: References
.. [#IDW] http://en.wikipedia.org/wiki/Inverse_distance_weighting
.. [#BL] Lüthi, Beat. Some Aspects of Strain, Vorticity and Material Element \
Dynamics as Measured with 3D Particle Tracking Velocimetry in a \
Turbulent Flow. PhD Thesis, ETH-Zürich (2002).
.. [#RBF] http://en.wikipedia.org/wiki/Radial_basis_function
.. rubric:: Documentation
"""
import numpy as np, warnings
from ConfigParser import SafeConfigParser
[docs]def select_neighbs(tracer_pos, interp_points, radius=None, num_neighbs=None):
"""
For each of m interpolation points, find its distance to all tracers. Use
result to decide which tracers are the neighbours of each interpolation
point, based on either a fixed radius or the closest num_neighbs.
Arguments:
tracer_pos - (n,3) array, the x,y,z coordinates of one tracer per row, [m]
interp_points - (m,3) array, coordinates of points where interpolation will
be done.
radius - of the search area for neighbours, [m]. If None, select closest
num_neighbs.
num_neighbs - number of closest neighbours to interpolate from. If None.
uses all neighbours in a given radius. ``radius`` has precedence.
Returns:
dists - (m,n) array, the distance from each interpolation point to each
tracer.
use_parts - (m,n) boolean array, True where tracer :math:`j=1...n` is a
neighbour of interpolation point :math:`i=1...m`.
"""
dists = np.linalg.norm(tracer_pos[None,:,:] - interp_points[:,None,:],
axis=2)
dists[dists <= 0] = np.inf # Only for selection phase,later changed back.
if radius is None:
if num_neighbs is None:
raise ValueError("Either radius or num_neighbs must be given.")
dist_sort = np.argsort(dists, axis=1)
use_parts = np.zeros(dists.shape, dtype=np.bool)
eff_num_neighbs = min(num_neighbs, tracer_pos.shape[0])
use_parts[
np.repeat(np.arange(interp_points.shape[0]), eff_num_neighbs),
dist_sort[:,:num_neighbs].flatten()] = True
else:
use_parts = dists < radius
dists[np.isinf(dists)] = 0.
return dists, use_parts
[docs]def inv_dist_interp(dists, use_parts, velocity, p=1):
"""
For each of n particle, generate the velocity interpolated to its
position from all neighbours as selected by caller. Interpolation method is
inverse-distance weighting, [#IDW]_
Arguments:
dists - (m,n) array, the distance of interpolation_point i=1...m from
tracer j=1...n, for (row,col) (i,j) [m]
use_parts - (m,n) boolean array, whether tracer j is a neighbour of
particle i, same indexing as ``dists``.
velocity - (n,3) array, the u,v,w velocity components for each of n
tracers, [m/s]
p - the power of inverse distance weight, w = r^(-p). default 1. Use 0 for
simple averaging.
Returns:
vel_avg - an (m,3) array with the interpolated velocity at each
interpolation point, [m/s].
"""
weights = np.zeros_like(dists)
weights[use_parts] = 1./dists[use_parts]**p
vel_avg = (weights[...,None] * velocity[None,...]).sum(axis=1) / \
weights.sum(axis=1)[:,None]
return vel_avg
[docs]def corrfun_interp(dists, use_parts, data, corrs_hist, corrs_bins):
"""
For each of n particle, generate the velocity interpolated to its
position from all neighbours as selected by caller. The weighting of
neighbours is by the correlation function, e.g. if the distance at
neighbor i is :math:`r_i`, then it adds :math:`\\rho(r_i)*v_i` to the
interpolated velocity. This is done for each component separately.
Arguemnts:
dists - (m,n) array, the distance of interpolation_point :math:`i=1...m`
from tracer :math:`j=1...n`, for (row,col) (i,j) [m]
use_parts - (m,n) boolean array, whether tracer j is a neighbour of
particle i, same indexing as ``dists``.
data - (n,d) array, the d components of the data that is interpolated from,
for each of n tracers.
corrs_hist - the correlation function histogram, an array of b bins.
corrs_bins - same size array, the bin start point for each bin.
Returns:
vel_avg - an (m,3) array with the interpolated velocity at each
interpolation point, [units of ``data``].
"""
weights = np.zeros(dists.shape + (data.shape[-1],))
weights[use_parts] = corrs_hist[
np.digitize(dists[use_parts].flatten(), corrs_bins) - 1]
vel_avg = (weights * data[None,...]).sum(axis=1) / \
weights.sum(axis=1)
return vel_avg
[docs]def rbf_interp(tracer_dists, dists, use_parts, data, epsilon=1e-2):
"""
Radial-basis interpolation [3] for each particle, from all neighbours
selected by caller. The difference from inv_dist_interp is that the
weights are independent of interpolation point, among other differences.
Arguments:
tracer_dists - (n,n) array, the distance of tracer :math:`i=1...n` from
tracer :math:`j=1...n`, for (row,col) (i,j) [m]
dists - (m,n) array, the distance from interpolation point
:math:`i=1...m` to tracer j. [m]
use_parts - (m,n) boolean array, True where tracer :math:`j=1...n` is a
neighbour of interpolation point :math:`i=1...m`.
data - (n,d) array, the d components of the data for each of n tracers.
Returns:
vel_interp - an (m,3) array with the interpolated velocity at the position
of each particle, [m/s].
"""
kernel = np.exp(-tracer_dists**2 * epsilon)
# Determine the set of coefficients for each particle:
coeffs = np.zeros(dists.shape + (data.shape[-1],))
for pix in xrange(dists.shape[0]):
neighbs = np.nonzero(use_parts[pix])[0]
K = kernel[np.ix_(neighbs, neighbs)]
coeffs[pix, neighbs] = np.linalg.solve(K, data[neighbs])
rbf = np.exp(-dists**2 * epsilon)
vel_interp = np.sum(rbf[...,None] * coeffs, axis=1)
return vel_interp
[docs]class Interpolant(object):
"""
Holds all parameters necessary for performing an interpolation. Use is as
a callable object after initialization, see :meth:`__call__`.
"""
def __init__(self, method, num_neighbs=None, radius=None, param=None):
"""
Arguments:
method - interpolation method. Either 'inv' for inverse-distance
weighting, 'rbf' for gaussian-kernel Radial Basis Function
method, or 'corrfun' for using a correlation function.
radius - of the search area for neighbours, [m]. If None, select
closest ``neighbs``.
neighbs - number of closest neighbours to interpolate from. If None.
uses 4 neighbours for 'inv' method, and 7 for 'rbf', unless
``radius`` is not None, then ``neighbs`` is ignored.
param - the parameter adjusting the interpolation method. For IDW it is
the inverse power (default 1), for rbf it is epsilon (default 1e5).
"""
if method == 'inv':
if num_neighbs is None:
num_neighbs = 4
if param is None:
param = 1
elif method == 'rbf':
if num_neighbs is None:
num_neighbs = 7
if param is None:
param = 1e5
elif method == 'corrfun':
if num_neighbs is None:
num_neighbs = 4
if param is None:
raise ValueError("'corrfun' method requires param to be "\
"an NPZ file name containing the corrs and bins arrays.")
c = np.load(param)
self._corrs = c['corrs']
self._bins = c['bins']
else:
raise NotImplementedError("Interpolation method %s not supported" \
% method)
self._method = method
self._neighbs = num_neighbs
self._radius = radius
self._par = param
def num_neighbs(self):
return self._neighbs
def radius(self):
return self._radius
[docs] def set_scene(self, tracer_pos, interp_points, data):
"""
Records scene data for future interpolation using the same scene.
Arguments:
tracer_pos - (n,3) array, the x,y,z coordinates of one tracer per row,
in [m]
interp_points - (m,3) array, coordinates of points where interpolation
will be done.
data - (n,d) array, the for the d-dimensional data for tracer n. For
example, in velocity interpolation this would be (n,3), each tracer
having 3 components of velocity.
"""
self.__tracers = tracer_pos
self.__interp_pts = interp_points
self.__data = data
# empty the neighbours cache:
self.__dists = None
self.__active_neighbs = None
[docs] def trim_points(self, which):
"""
Remove interpolation points from the scene.
Arguments:
which - a boolean array, length is number of current particle list
(as given in set_scene), True to trim a point, False to keep.
"""
keep = ~which
self.__interp_pts = self.__interp_pts[keep]
if self.__dists is not None:
self.__dists = self.__dists[keep]
self.__active_neighbs = self.__active_neighbs[keep]
def _forego_laziness(self):
"""
Populate the neighbours cache.
"""
self.__dists, self.__active_neighbs = select_neighbs(
self.__tracers, self.__interp_pts, self._radius, self._neighbs)
if self._method == 'rbf':
self.__tracer_dists, _ = select_neighbs(
self.__tracers, self.__tracers, self._radius, self._neighbs)
[docs] def which_neighbours(self):
"""
Finds the neighbours that would be selected for use at each
interpolation point, given the current scene as set by set_scene().
Returns:
(m,n) boolean array, True where tracer :math:`j=1...n` is a neighbour
of interpolation point :math:`i=1...m` under the reigning selection
criteria.
"""
if self.__active_neighbs is None:
self._forego_laziness()
return self.__active_neighbs
def current_dists(self):
if self.__active_neighbs is None:
self._forego_laziness()
return self.__dists
[docs] def interpolate(self, subset=None):
"""
Performs an interpolation over the recorded scene.
Arguments:
subset - a neighbours selection array, such as returned from
:meth:`which_neighbours`, to replace the recorded selection. Default
value (None) uses the recorded selection. The recorded selection
is not changed, so ``subset`` is forgotten after the call.
Returns:
an (m,3) array with the interpolated value at the position of each
of m particles.
"""
# Check that the cache is populated:
if self.__active_neighbs is None:
self._forego_laziness()
act_neighbs = self.__active_neighbs if subset is None else subset
# If for some reason tracking failed for a whole frame,
# interpolation is impossible at that frame. This checks for frame
# tracking failure.
if len(self.__tracers) == 0:
# Temporary measure until I can safely discard frames.
warnings.warn("No tracers im frame, interpolation returned zeros.")
ret_shape = self.__data.shape[-1] if self.__data.ndim > 1 else 1
return np.zeros((self.__interp_pts.shape[0], ret_shape))
if self._method == 'inv':
return inv_dist_interp(self.__dists, act_neighbs, self.__data,
self._par)
if self._method == 'rbf':
return rbf_interp(self.__tracer_dists, self.__dists, act_neighbs,
self.__data, self._par)
if self._method == 'corrfun':
return corrfun_interp(self.__dists, act_neighbs, self.__data,
self._corrs, self._bins)
# This isn't supposed to ever happen. The constructor should fail.
raise NotImplementedError("Interpolation method %s not supported" \
% self._method)
[docs] def __call__(self, tracer_pos, interp_points, data):
"""
Sets up the necessary parameters, and performs the interpolation.
Does not change the scene set by set_scene if any, so may be used
for any off-scene interpolation.
Arguments:
tracer_pos - (n,3) array, the x,y,z coordinates of one tracer per row,
in [m]
interp_points - (m,3) array, coordinates of points where interpolation
will be done.
data - (n,d) array, the for the d-dimensional data for tracer n. For
example, in velocity interpolation this would be (n,3), each tracer
having 3 components of velocity.
Returns:
vel_interp - an (m,3) array with the interpolated value at the position
of each particle, [m/s].
"""
# If for some reason tracking failed for a whole frame, interpolation
# is impossible at that frame. This checks for frame tracking failure.
if len(tracer_pos) == 0:
# Temporary measure until I can safely discard frames.
warnings.warn("No tracers im frame, interpolation returned zeros.")
ret_shape = data.shape[-1] if data.ndim > 1 else 1
return np.zeros((interp_points.shape[0], ret_shape))
dists, use_parts = select_neighbs(tracer_pos, interp_points,
self._radius, self._neighbs)
if self._method == 'inv':
return inv_dist_interp(dists, use_parts, data, self._par)
elif self._method == 'rbf':
tracer_dists = select_neighbs(tracer_pos, tracer_pos,
self._radius, self._neighbs)[0]
return rbf_interp(tracer_dists, dists, use_parts, data, self._par)
elif self._method == 'corrfun':
return corrfun_interp(dists, use_parts, data,
self._corrs, self._bins)
else:
# This isn't supposed to ever happen. The constructor should fail.
raise NotImplementedError("Interpolation method %s not supported" \
% self._method)
[docs] def neighb_dists(self, tracer_pos, interp_points):
"""
The distance from each interpolation point to each data point of those
used for interpolation. Assumes, for now, a constant number of
neighbours.
Arguments:
tracer_pos - (n,3) array, the x,y,z coordinates of one tracer per row,
in [m]
interp_points - (m,3) array, coordinates of points where interpolation
will be done.
Returns:
ndists - an (m,c) array, for c closest neighbours as defined during
object construction.
"""
dists, use_parts = select_neighbs(tracer_pos, interp_points,
None, self._neighbs)
ndists = np.zeros((interp_points.shape[0], self._neighbs))
for pt in xrange(interp_points.shape[0]):
# allow assignment of less than the desired number of neighbours.
ndists[pt] = dists[pt, use_parts[pt]]
return ndists
[docs] def save_config(self, cfg):
"""
Adds the keys necessary for recreating this interpolant into a
configuration object. It is the caller's responsibility to do a
writeback to file.
Arguments:
cfg - a ConfigParser object.
"""
if not cfg.has_section("Interpolant"):
cfg.add_section("Interpolant")
cfg.set('Interpolant', 'radius', str(self.radius()))
cfg.set('Interpolant', 'num_neighbs', str(self.num_neighbs()))
cfg.set('Interpolant', 'param', str(self._par))
cfg.set('Interpolant', 'method', self._method)
[docs]def read_interpolant(conf_fname):
"""
Builds an Interpolant object based on values in an INI-formatted file.
Arguments:
conf_fname - path to configuration file.
Returns:
an Interpolant object constructed from values in the configuration file.
"""
parser = SafeConfigParser()
parser.read(conf_fname)
# Optional arguments:
kwds = {}
if parser.has_option('Interpolant', 'num_neighbs'):
kwds['num_neighbs'] = parser.getint('Interpolant', 'num_neighbs')
if parser.has_option('Interpolant', 'radius'):
kwds['radius'] = parser.getfloat('Interpolant', 'radius')
if parser.has_option('Interpolant', 'param'):
kwds['param'] = parser.getfloat('Interpolant', 'param')
return Interpolant(parser.get('Interpolant', 'method'), **kwds)