Source code for flowtracks.trajectory

# -*- coding: utf-8 -*-

import types, numpy as np
import scipy.interpolate as interp

[docs]class Frame(object): """ This is basically a structure with no fancy behaviour. When it is returned from a Flowtracks function, it has two attributes, ``particles`` and ``tracers`` - each pointing to a :class:`ParticleSnapshot` object holding data for particles of the respective type. """ pass
[docs]class ParticleSet(object): """ A base class for manipulting particle data. Knows how many particles it has, and holds a varying number of particle properties, each given for the entire set. Properties may be created at construction time or later. When a property is created, it gets a setter method of the same name and a getter method prefixed with ``set_``. This applies also for mandatory properties. """ def __init__(self, pos, velocity, **kwds): """ Arguments: pos - a (t,3) array, the position of one particle in t time-points, [m]. velocity - (t,3) array, corresponding velocity, [m/s]. kwds - keyword arguments should be arrays whose first dimension == t. these are treated as extra attributes to be sliced when creating segments. """ base_vals = { 'pos': pos, 'velocity': velocity, } base_vals.update(kwds) self._check_attr = [] # Attrs to look for when concatenating bundles for n, v in base_vals.iteritems(): self.create_property(n, v)
[docs] def create_property(self, propname, init_val): """ Add a property of the set, expected to be an array whose shape[0] == len(self). Creates the method <propname>(self, selector=None). If selector is given, it will return only the selected time-points. Also creates set_<propname>(self, value, selector=None) which sets either the value over the entire trajectory or just for the selected time points (this requires the property to already exist for the full trajectory). Arguments: propname - a string, should be a valid Python identifier. init_val - the initial value for the property. """ attr = '_' + propname self._check_attr.append(propname) def getter(self, selector=None): if selector is None: return self.__dict__[attr] else: return self.__dict__[attr][selector] def setter(self, new_val, selector=None): if selector is None: self.__dict__[attr] = new_val else: self.__dict__[attr][selector] = new_val self.__dict__[propname] = \ types.MethodType(getter, self, self.__class__) self.__dict__['set_' + propname] = \ types.MethodType(setter, self, self.__class__) if init_val is not None: self.__dict__['set_' + propname](init_val)
[docs] def has_property(self, propname): """ Checks whether the looked-after property ``propname`` exists for this particle set. """ return (propname in self._check_attr)
[docs] def schema(self): """ Creates a dictionary keyed by property name whose values are the shape of one particle's value for that property. Example: {'pos': (3,), 'velocity': (3,)} """ return dict((propname, self.__dict__['_' + propname].shape[1:]) \ for propname in self._check_attr)
[docs] def ext_schema(self): """ Extended schema. Like :meth:`schema` but the values of the returned dictionary are a tuple (type, shape). The shape is scalar, so it only supports 1D or 0D items. """ schm = {} for propname in self._check_attr: prop = self.__dict__['_' + propname] shape = prop.shape[-1] if prop.ndim > 1 else 1 schm[propname] = (prop.dtype.type, shape) return schm
[docs] def as_dict(self): """ Returns a dictionary with the "business" properties only, without all the Python bookkeeping and other stuff in the __dict__. """ return dict((propname, self.__dict__['_' + propname]) \ for propname in self._check_attr)
[docs] def __len__(self): """Return the number of particles in the set.""" return self._pos.shape[0]
[docs]class Trajectory(ParticleSet): """ This is one of the two main classes used for iteration over a scene. It inherits from :class:`ParticleSet` with the added demand that a scalar trajectory ID (an integer unique amond the scene's trajectories) and a ``time`` property. """ def __init__(self, pos, velocity, time, trajid, **kwds): """ Arguments: pos - a (t,3) array, the position of one particle in t time-points, [m]. velocity - (t,3) array, corresponding velocity, [m/s]. time - (t,) array, the clock ticks. No specific units needed. trajid - the unique identifier of this trajectory in the set of trajectories that belong to the same sequence. kwds - keyword arguments should be arrays whose first dimension == t. these are treated as extra attributes to be sliced when creating segments. """ self._id = trajid kwds['time'] = time ParticleSet.__init__(self, pos, velocity, **kwds) def trajid(self): return self._id
[docs] def __getitem__(self, selector): """ Gets the data for time points selected as a table of shape (n,8), concatenating position, velocity, time, broadcasted trajid. Arguments: selector - any 1d indexing expression known to numpy. """ return np.hstack((self._pos[selector], self._velocity[selector], self._time[selector][...,None], np.ones(self._pos[selector].shape[:-1] + (1,))*self._id))
[docs] def smoothed(self, err_bound, order): """ Creates a trajectory generated from this trajectory using cubic B-spline interpolation. Arguments: err_bound - amount of deviation a particle is expeted to have around its observed place. Determines strength of smoothing. order - of the spline (odd, up to 5). Returns: a new :class:`Trajectory` object with the interpolated positions and velocities. If the length of the trajectory < 4, returns self. """ if len(self.time()) < order + 1: return self s = (len(self.time()) * err_bound)**2 spline, eval_prms = interp.splprep(list(self.pos().T), k=order, s=s) new_pos = np.array(interp.splev(eval_prms, spline)).T new_vel = np.array(interp.splev(eval_prms, spline, der=1)).T new_accel = np.array(interp.splev(eval_prms, spline, der=2)).T return Trajectory(new_pos, new_vel, self.time(), self.trajid(), accel=new_accel)
[docs]class ParticleSnapshot(ParticleSet): """ This is one of the two main classes used for iteration over a scene. It inherits from :class:`ParticleSet` with the added demand for a scalar time and a ``trajid`` property for trajectory ID (an integer unique among the scene's trajectories). """ def __init__(self, pos, velocity, time, trajid, **kwds): """ Arguments: pos - a (p,3) array, the position of one particle of p, [m]. velocity - (p,3) array, corresponding velocity, [m/s]. trajid - (p,3) array, for each particle in the snapshot, the unique identifier of the trajectory it belongs to. time - scalar, the identifier of the frame from which this snapshot is taken. kwds - keyword arguments should be arrays whose first dimension == p. these are treated as extra attributes to be sliced when creating segments. """ self._t = time kwds['trajid'] = trajid ParticleSet.__init__(self, pos, velocity, **kwds) def time(self): return self._t
[docs]def mark_unique_rows(all_rows): """ Filter out rows whose position columns represent a particle that already appears, so that each particle position appears only once. Arguments: all_rows - an array with n rows and at least 3 columns for position. Returns: an array with the indices of rows to take from the input such that in the result, the first 3 columns form a unique combination. """ # Remove duplicates (particles occupying same position): srt = np.lexsort(all_rows[:,:3].T) diff = np.diff(all_rows[srt,:3], axis=0).any(axis=1) uniq = np.r_[srt[0], srt[1:][diff]] uniq.sort() return uniq
[docs]def trajectories_in_frame(trajects, frame_num, start_times=None, end_times=None, segs=False): """ Notes the indices of trajectories participating in the frame for later extraction. Arguments: trajects - a list of :class:Trajectory objects to filter. frame_num - the time value (as found in trajectory.time()) at which the trajectory should be active. start_times, end_times - each a ``len(trajects)`` array containing the corresponding start/end frame number of each trajectory, respectively. segs - true if the trajectory should be active also in the following frame. Returns: traj_nums = the indices of active trajectories in ``trajects``. """ if start_times is None or end_times is None: start_end = [(tr.time()[0], tr.time()[-1]) for tr in trajects] start_times, end_times = map(np.array, zip(*start_end)) end_frm = (frame_num + 1) if segs else frame_num cands = (frame_num >= start_times) & (end_frm <= end_times) cand_nums = np.nonzero(cands)[0] if len(cand_nums) > 0: # Filter candidates with overlapping particles. frm_ixs = frame_num - start_times[cands] pos = np.array([trajects[trix].pos()[frm] \ for trix, frm in zip(cand_nums, frm_ixs)]) update_cands = np.zeros(pos.shape[0], dtype=np.bool) update_cands[mark_unique_rows(pos)] = True cands[cands] = update_cands cand_nums = np.nonzero(cands)[0] return cand_nums
[docs]def take_snapshot(trajects, frame, schema): """ Goes over a list of trajectories and extracts the particle data at a given time point. If the trajectory list is empty, creates an empty snapshot. Arguments: trajects - a list of :class:Trajectory objects to query. frame - the frame number to which snapshot data belongs. schema - a dict, ``{propname: shape tuple}``, as given by the trajectory's :meth:`~.ParticleSet.schema`. This is only needed for consistency in the case of an empty trajectory list resulting in an empty snapshot. Returns: a :class:`ParticleSnapshot` object with all the particles in the given frame. """ if len(trajects) == 0: kwds = dict((k, np.empty((0,) + v)) for k, v in schema.iteritems()) kwds['time'] = frame return ParticleSnapshot(trajid=np.empty(0), **kwds) kwds = dict((k, np.empty( (len(trajects),) + v, dtype=trajects[0].__dict__['_' + k].dtype)) \ for k, v in schema.iteritems()) copy_keys = kwds.keys() kwds['trajid'] = np.empty(len(trajects), dtype=np.int_) for trix, traj in enumerate(trajects): first_frame = traj.time()[0] for prop in copy_keys: kwds[prop][trix] = traj.__dict__[prop](frame - first_frame) kwds['trajid'][trix] = traj.trajid() kwds['time'] = frame return ParticleSnapshot(**kwds)