__all__ = [
__displayname__ = 'Point/Line Sets'
from datetime import datetime
import numpy as np
import pyvista
import vtk
from vtk.numpy_interface import dataset_adapter as dsa
from .. import _helpers, interface
from ..base import FilterBase, FilterPreserveTypeBase
# improt CreateTensorMesh for its cell string parsing
from ..model_build import CreateTensorMesh
# NOTE: internal import of pyproj in LonLatToUTM
# ---- Cell Connectivity ----#
class AddCellConnToPoints(FilterBase):
"""This filter will add linear cell connectivity between scattered points.
You have the option to add ``VTK_Line`` or ``VTK_PolyLine`` connectivity.
``VTK_Line`` connectivity makes a straight line between the points in order
(either in the order by index or using a nearest neighbor calculation).
The ``VTK_PolyLine`` adds a poly line connectivity between all points as
one spline (either in the order by index or using a nearest neighbor
calculation). Type map is specified in `vtkCellType.h`.
**Cell Connectivity Types:**
- 4: Poly Line
- 3: Line
__displayname__ = 'Add Cell Connectivity to Points'
__category__ = 'filter'
def __init__(self, **kwargs):
# Parameters
self.__cell_type = kwargs.get('cell_type', vtk.VTK_POLY_LINE)
self.__usenbr = kwargs.get('nearest_nbr', False)
self.__close_loop = kwargs.get('close_loop', False)
self.__keep_vertices = kwargs.get('keep_vertices', False)
self.__unique = kwargs.get('unique', False)
def _connect_cells(self, pdi, pdo, log_time=False):
"""Internal helper to perform the connection"""
# NOTE: Type map is specified in vtkCellType.h
cell_type = self.__cell_type
if log_time:
start_time = datetime.now()
# Get the Points over the NumPy interface
pdi = pyvista.wrap(pdi)
points = np.copy(
) # New NumPy array of points so we dont destroy input
if self.__unique:
# Remove repeated points
indexes = np.unique(points, return_index=True, axis=0)[1]
points = np.array(points[sorted(indexes)])
def _find_min_path(points):
# sklearn's KDTree is faster: use it if available
from sklearn.neighbors import KDTree as Tree
except ImportError:
from scipy.spatial import cKDTree as Tree
_compute_dist = lambda pt0, pt1: np.linalg.norm(pt0 - pt1)
ind, min_dist = None, np.inf
tree = Tree(points)
for pt in points:
cur_ind = tree.query([pt], k=len(points))[1].ravel()
dist = 0.0
for i in range(len(cur_ind) - 1):
dist += _compute_dist(points[cur_ind[i]], points[cur_ind[i + 1]])
if dist < min_dist:
ind = cur_ind
min_dist = dist
return ind.ravel()
if self.__usenbr:
ind = _find_min_path(points)
ind = np.arange(len(points), dtype=int)
if self.__keep_vertices:
poly = pyvista.PolyData(np.copy(points))
poly = pyvista.PolyData()
poly.points = np.copy(points)
if cell_type == vtk.VTK_LINE:
lines = np.c_[np.full(len(ind) - 1, 2), ind[0:-1], ind[1:]]
if self.__close_loop:
app = np.append(
[2, ind[-1], ind[0]],
lines = app
poly.lines = lines
elif cell_type == vtk.VTK_POLY_LINE:
cells = vtk.vtkCellArray()
cell = vtk.vtkPolyLine()
if self.__close_loop:
cell.GetPointIds().SetNumberOfIds(len(ind) + 1)
for i in ind:
cell.GetPointIds().SetId(i, ind[i])
if self.__close_loop:
cell.GetPointIds().SetId(i + 1, ind[0])
raise _helpers.PVGeoError('Cell type ({}) not supported'.format(cell_type))
for key, val in pdi.point_data.items():
poly.point_data[key] = val
if log_time:
print("exectuted in {}".format(datetime.now() - start_time))
return pdo
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output data object"""
# Get input/output of Proxy
pdi = self.GetInputData(inInfo, 0, 0)
pdo = self.GetOutputData(outInfo, 0)
# Perform task
self._connect_cells(pdi, pdo)
return 1
#### Setters and Getters ####
def set_cell_type(self, cell_type):
"""Set the cell typ by the integer id as specified in `vtkCellType.h`"""
if cell_type != self.__cell_type:
self.__cell_type = cell_type
def set_use_nearest_nbr(self, flag):
"""Set a flag on whether to a KDTree nearest neighbor
algorithm to sort the points to before adding linear connectivity.
if flag != self.__usenbr:
self.__usenbr = flag
def set_use_unique_points(self, flag):
"""Set a flag on whether to only use unique points"""
if flag != self.__unique:
self.__unique = flag
class PointsToTube(AddCellConnToPoints):
"""Takes points from a vtkPolyData object and constructs a line of those
points then builds a polygonal tube around that line with some specified
radius and number of sides.
__displayname__ = 'Points to Tube'
__category__ = 'filter'
def __init__(self, num_sides=20, radius=10.0, capping=False, **kwargs):
AddCellConnToPoints.__init__(self, **kwargs)
# Additional Parameters
# NOTE: CellType should remain vtk.VTK_POLY_LINE (4) connection
self.__numSides = num_sides
self.__radius = radius
self.__capping = capping
def _connect_cells(self, pdi, pdo, log_time=False):
"""This uses the parent's ``_connect_cells()`` to build a tube around"""
AddCellConnToPoints._connect_cells(self, pdi, pdo, log_time=log_time)
tube = vtk.vtkTubeFilter()
# User Defined Parameters
# apply the filter
return pdo
#### Setters and Getters ####
def set_radius(self, radius):
"""Set the radius of the tube"""
if self.__radius != radius:
self.__radius = radius
def set_number_of_sides(self, num):
"""Set the number of sides (resolution) for the tube"""
if self.__numSides != num:
self.__numSides = num
def set_capping(self, flag):
"""Set a boolean flag on whether or not to cap the ends of the tube"""
if self.__capping != flag:
self.__capping = flag
# ---- LonLat to Cartesian ----#
class LonLatToUTM(FilterPreserveTypeBase):
"""Converts Points from Lon Lat to UTM"""
__displayname__ = 'Lat Lon To UTM'
__category__ = 'filter'
def __init__(self, **kwargs):
FilterPreserveTypeBase.__init__(self, inputType='vtkDataSet', **kwargs)
self.__zone = (11,)
self.__ellps = 'WGS84'
self.set_zone(kwargs.get('zone', 11)) # User defined
self.set_ellps(kwargs.get('ellps', 'WGS84')) # User defined
def get_available_ellps(idx=None):
"""Returns the available ellps"""
import pyproj
ellps = pyproj.pj_ellps.keys()
# Now migrate WGSXX to front so that 84 is always default
wgs = ['WGS60', 'WGS66', 'WGS72', 'WGS84']
for i, name in enumerate(wgs):
oldindex = ellps.index(name)
ellps.insert(0, ellps.pop(oldindex))
if idx is not None:
return ellps[idx]
return ellps
def __convert_2d(self, lon, lat, elev):
"""Converts 2D Lon Lat coords to 2D XY UTM points"""
import pyproj
p = pyproj.Proj(proj='utm', zone=self.__zone, ellps=self.__ellps)
utm_x, utm_y = p(lon, lat)
return np.c_[utm_x, utm_y, elev]
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output"""
# Get input/output of Proxy
pdi = pyvista.wrap(self.GetInputData(inInfo, 0, 0))
pdo = self.GetOutputData(outInfo, 0)
#### Perform task ####
if not hasattr(pdi, 'points'):
raise _helpers.PVGeoError(
'Input data object does not have points to convert.'
coords = pdi.points.copy() # New NumPy array of points so we dont destroy input
# Now Convert the points
points = self.__convert_2d(coords[:, 0], coords[:, 1], coords[:, 2])
output = pdi.copy()
output.points = points
return 1
def set_zone(self, zone):
"""Set the UTM zone number"""
if zone < 1 or zone > 60:
raise _helpers.PVGeoError('Zone (%d) is invalid.' % zone)
if self.__zone != zone:
self.__zone = int(zone)
def set_ellps(self, ellps):
"""Set the ellipsoid type"""
if isinstance(ellps, int):
ellps = self.get_available_ellps(idx=ellps)
if not isinstance(ellps, str):
raise _helpers.PVGeoError('Ellipse must be a string.')
if self.__ellps != ellps:
self.__ellps = ellps
# ---- Coordinate Rotations ----#
class RotatePoints(FilterBase):
"""Rotates XYZ coordinates in `vtkPolyData` around an origin at a given
angle on the XY plane.
__displayname__ = 'Rotate Points'
__category__ = 'filter'
def __init__(self, angle=45.0, origin=None, use_corner=True):
# Parameters
self.__angle = angle
if origin is None:
origin = [0.0, 0.0]
self.__origin = origin
self.__use_corner = use_corner
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output."""
# Get input/output of Proxy
pdi = self.GetInputData(inInfo, 0, 0)
pdo = self.GetOutputData(outInfo, 0)
#### Perform task ####
# Get the Points over the NumPy interface
wpdi = dsa.WrapDataObject(pdi) # NumPy wrapped input
points = np.array(
) # New NumPy array of points so we dont destroy input
origin = self.__origin
if self.__use_corner:
idx = np.argmin(points[:, 0])
origin = [points[idx, 0], points[idx, 1]]
points[:, 0:2] = RotationTool.rotate_around(
points[:, 0:2], self.__angle, origin
pts = pdo.GetPoints()
for i, pt in enumerate(points):
pts.SetPoint(i, pt)
return 1
def set_rotation_degrees(self, theta):
"""Sets the rotational angle in degrees."""
theta = np.deg2rad(theta)
if self.__angle != theta:
self.__angle = theta
def set_origin(self, xo, yo):
"""Sets the origin to perform the rotate around."""
if self.__origin != [xo, yo]:
self.__origin = [xo, yo]
def set_use_corner(self, flag):
"""A flag to use a corner of the input data set as the rotational
if self.__use_corner != flag:
self.__use_corner = flag
class AppendCellCenters(FilterPreserveTypeBase):
__displayname__ = 'Append Cell Centers'
__category__ = 'filter'
def __init__(self, **kwargs):
FilterPreserveTypeBase.__init__(self, **kwargs)
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output"""
pdi = self.GetInputData(inInfo, 0, 0)
pdo = self.GetOutputData(outInfo, 0)
# Find cell centers
filt = vtk.vtkCellCenters()
# I use the dataset adapter/numpy interface because it is easy
centers = dsa.WrapDataObject(filt.GetOutput()).Points
centers = interface.convert_array(centers)
centers.SetName('Cell Centers')
# Copy input data and add cell centers as tuple array
return 1
class IterateOverPoints(FilterBase):
"""Iterate over points in a time varying manner."""
__displayname__ = 'Iterate Over Points'
__category__ = 'filter'
def __init__(self, dt=1.0):
# Parameters
self.__dt = dt
self.__timesteps = None
self.__original = 2
self.__tindex = None
self.__n = 2
self.__decimate = 100
# The point/normal that gets updated on every iteration
self.__point = (0.0, 0.0, 0.0)
self.__normal = (1.0, 0.0, 0.0)
def _update_time_steps(self):
"""For internal use only"""
self.__timesteps = _helpers.update_time_steps(self, self.__n, self.__dt)
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output"""
# Get input/output of Proxy
pdi = self.GetInputData(inInfo, 0, 0)
# Get number of points
pdo = self.GetOutputData(outInfo, 0)
#### Perform task ####
# Get the Points over the NumPy interface
# wpdi = dsa.WrapDataObject(pdi) # NumPy wrapped input
# Get requested time index
i = _helpers.get_requested_time(self, outInfo)
# Now grab point at this timestep
pt = pdi.GetPoints().GetPoint(self.__tindex[i])
# Calculate normal
pts1 = self.__point
pts2 = pt
x1, y1, z1 = pts1[0], pts1[1], pts1[2]
x2, y2, z2 = pts2[0], pts2[1], pts2[2]
normal = [x2 - x1, y2 - y1, z2 - z1]
self.__point = pt
self.__normal = normal
poly = interface.points_to_poly_data(np.array(pt))
return 1
#### Public Setters / Getters ####
def set_decimate(self, percent):
"""Set the percent (1 to 100) to decimate"""
if percent > 100 or percent < 1:
self.__decimate = percent
self.__n = int(self.__original * (percent / 100.0))
self.__tindex = np.linspace(0, self.__original - 1, self.__n, dtype=int)
def set_time_delta(self, dt):
Set the time step interval in seconds
if self.__dt != dt:
self.__dt = dt
def get_time_step_values(self):
"""Use this in ParaView decorator to register timesteps"""
return self.__timesteps.tolist() if self.__timesteps is not None else None
def get_point(self):
"""Get the current point"""
return list(self.__point)
def get_normal(self):
"""Get the current normal vector"""
return list(self.__normal)
class ConvertUnits(FilterPreserveTypeBase):
"""Convert points in an input data object from meters to feet or vice versa.
This simply uses a ``vtkTransformFilter`` and scales input data object with
common conversions.
__displayname__ = 'Convert XYZ Units'
__category__ = 'filter'
def __init__(self, conversion='meter_to_feet', **kwargs):
FilterPreserveTypeBase.__init__(self, **kwargs)
self.__conversion = conversion
def lookup_conversions(get_keys=False):
"""All Available conversions
dict: dictionary of conversion units
convs = dict(
feet_to_meter=1 / 3.2808399,
if get_keys:
return convs.keys()
return convs
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output"""
# Get input/output of Proxy
pdi = self.GetInputData(inInfo, 0, 0)
# Get number of points
pdo = self.GetOutputData(outInfo, 0)
#### Perform task ####
filt = vtk.vtkTransformFilter()
trans = vtk.vtkTransform()
trans.Scale(self.get_conversion(), self.get_conversion(), self.get_conversion())
scaled = filt.GetOutputDataObject(0)
return 1
def set_conversion(self, key):
"""Set the conversion via a lookup table"""
convs = self.lookup_conversions()
if isinstance(key, str):
if key.lower() not in convs.keys():
raise _helpers.PVGeoError('Converion `%s` not available.' % key)
elif isinstance(key, int):
key = convs.keys()[key]
if self.__conversion != key:
self.__conversion = key
return 1
def get_conversion(self):
"""Get the conversion value"""
convs = self.lookup_conversions()
return convs[self.__conversion]
class BuildSurfaceFromPoints(FilterBase):
"""From the sorted x, y, and z station locations in the input PolyData,
create a surface to project down from the line of those points. Use the
Z cells to control the size of the mesh surface
__displayname__ = 'Build Surface From Points'
__category__ = 'filter'
def __init__(self, **kwargs):
self, inputType='vtkPolyData', outputType='vtkStructuredGrid', **kwargs
self.__zcoords = CreateTensorMesh._read_cell_line('0. 50.')
zcoords = kwargs.get('zcoords', self.__zcoords)
if not isinstance(zcoords, (str, list, tuple, np.ndarray)):
raise TypeError('zcoords of bad type.')
if isinstance(zcoords, str):
def create_surface(points, z_range):
"""From the sorted x, y, and z station locations, create a surface
to display a seismic recording/migration on in space. The result is
defined in the X,Y,Z-z_range 3D space.
The z_range should be treated as relative coordinates to the values
given on the third column of the points array. If you want the values
in the z_range to be treated as the absolute coordinates, simply
do not pass any Z values in the points array - if points is N by 2,
then the values in z_range will be inferred as absolute.
points (np.ndarray): array-like of the station x and y locations
(npts by 2-3) z_range (np.ndarray): The linear space of the z
dimension. This will be filled out for every station location.
if hasattr(points, 'values'):
# This will extract data from pandas dataframes if those are given
points = points.values
points = np.array(points)
z_range = np.array(z_range)
xloc = points[:, 0]
yloc = points[:, 1]
if points.shape[1] > 2:
zloc = points[:, 2]
val = np.nanmax(z_range)
z_range = val - np.flip(z_range)
zloc = np.full(xloc.shape, val)
if not len(xloc) == len(yloc) == len(zloc):
raise AssertionError('Coordinate shapes do not match.')
nt = len(xloc)
ns = len(z_range)
# Extrapolate points to a 2D surface
# repeat the XY locations across
points = np.repeat(np.c_[xloc, yloc, zloc], ns, axis=0)
# repeat the Z locations across
tp = np.repeat(z_range.reshape((-1, len(z_range))), nt, axis=0)
tp = zloc[:, None] - tp
points[:, -1] = tp.ravel()
# Produce the output
output = pyvista.StructuredGrid()
output.points = points
output.dimensions = [ns, nt, 1]
return output
def RequestData(self, request, inInfo, outInfo):
"""Execute on pipeline"""
# Get input/output of Proxy
pdi = self.GetInputData(inInfo, 0, 0)
# Get number of points
pdo = self.GetOutputData(outInfo, 0)
#### Perform task ####
data = pyvista.wrap(pdi)
output = BuildSurfaceFromPoints.create_surface(
data.points, np.array(self.__zcoords)
return 1
def set_z_coords(self, zcoords):
"""Set the spacings for the cells in the Z direction
zcoords (list or np.array(floats)): the spacings along the Z-axis"""
if len(zcoords) != len(self.__zcoords) or not np.allclose(
self.__zcoords, zcoords
self.__zcoords = zcoords
def set_z_coords_str(self, zcoordstr):
"""Set the spacings for the cells in the Z direction
zcoordstr (str) : the spacings along the Z-axis in the UBC style"""
zcoords = CreateTensorMesh._read_cell_line(zcoordstr)