"""This module contains general grid readers and writers for programs like Surfer."""
__all__ = [
'SurferGridReader',
'WriteImageDataToSurfer',
'EsriGridReader',
'LandsatReader',
'WriteCellCenterData',
]
__displayname__ = 'File I/O'
# NOTE: Surfer no data value: 1.70141E+38
from struct import unpack
import warnings
import espatools
import numpy as np
import pandas as pd
import properties
import pyvista as pv
import vtk
from vtk.numpy_interface import dataset_adapter as dsa
from .. import _helpers, interface
from ..base import ReaderBase, ReaderBaseBase, WriterBase
from ..readers import DelimitedTextReader
###############################################################################
class GridInfo(properties.HasProperties):
"""Internal helper class to store Surfer grid properties and create
``vtkImageData`` objects from them.
"""
ny = properties.Integer('number of columns', min=2)
nx = properties.Integer('number of rows', min=2)
xll = properties.Float('x-value of lower-left corner')
yll = properties.Float('y-value of lower-left corner')
dx = properties.Float('x-axis spacing')
dy = properties.Float('y-axis spacing')
dmin = properties.Float('minimum data value', required=False)
dmax = properties.Float('maximum data value', required=False)
data = properties.Array('grid of data values', shape=('*',))
def mask(self):
"""Mask the no data value"""
data = self.data
nans = data >= 1.701410009187828e38
if np.any(nans):
data = np.ma.masked_where(nans, data)
err_msg = "{} of data ({}) doesn't match that set by file ({})."
if not np.allclose(self.dmin, np.nanmin(data)):
raise _helpers.PVGeoError(err_msg.format('Min', np.nanmin(data), self.dmin))
if not np.allclose(self.dmax, np.nanmax(data)):
raise _helpers.PVGeoError(err_msg.format('Max', np.nanmax(data), self.dmax))
self.data = data
return
def to_vtk(self, output=None, z=0.0, dz=1.0, data_name='Data'):
"""Convert to a ``vtkImageData`` object"""
self.mask()
self.validate()
if output is None:
output = vtk.vtkImageData()
# Build the data object
output.SetOrigin(self.xll, self.yll, z)
output.SetSpacing(self.dx, self.dy, dz)
output.SetDimensions(self.nx, self.ny, 1)
vtkarr = interface.convert_array(self.data, name=data_name)
output.GetPointData().AddArray(vtkarr)
return output
[docs]
class SurferGridReader(ReaderBase):
"""Read 2D ASCII/Binary Surfer grid files. The IO code was adopted from
`Seequent's steno3d_surfer`_
.. _Seequent's steno3d_surfer: https://github.com/seequent/steno3d-surfer/blob/master/steno3d_surfer/parser.py
Note:
MIT License
Copyright (c) 2018 Seequent
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__displayname__ = 'Surfer Grid Reader'
__category__ = 'reader'
extensions = 'grd GRD'
description = 'PVGeo: Surfer Grid'
def __init__(self, outputType='vtkImageData', **kwargs):
ReaderBase.__init__(self, outputType=outputType, **kwargs)
self.__grids = None
self.__data_name = kwargs.get('data_name', 'Data')
[docs]
@staticmethod
def _surfer7bin(filename):
"""See class notes."""
with open(filename, 'rb') as f:
if unpack('4s', f.read(4))[0] != b'DSRB':
raise _helpers.PVGeoError(
'''Invalid file identifier for Surfer 7 binary .grd
file. First 4 characters must be DSRB.'''
)
f.read(8) # Size & Version
section = unpack('4s', f.read(4))[0]
if section != b'GRID':
raise _helpers.PVGeoError(
'''Unsupported Surfer 7 file structure. GRID keyword
must follow immediately after header but {}
encountered.'''.format(
section
)
)
size = unpack('<i', f.read(4))[0]
if size != 72:
raise _helpers.PVGeoError(
'''Surfer 7 GRID section is unrecognized size. Expected
72 but encountered {}'''.format(
size
)
)
nrow = unpack('<i', f.read(4))[0]
ncol = unpack('<i', f.read(4))[0]
x0 = unpack('<d', f.read(8))[0]
y0 = unpack('<d', f.read(8))[0]
deltax = unpack('<d', f.read(8))[0]
deltay = unpack('<d', f.read(8))[0]
zmin = unpack('<d', f.read(8))[0]
zmax = unpack('<d', f.read(8))[0]
rot = unpack('<d', f.read(8))[0]
if rot != 0:
warnings.warn('Unsupported feature: Rotation != 0')
blankval = unpack('<d', f.read(8))[0]
section = unpack('4s', f.read(4))[0]
if section != b'DATA':
raise _helpers.PVGeoError(
'''Unsupported Surfer 7 file structure. DATA keyword
must follow immediately after GRID section but {}
encountered.'''.format(
section
)
)
datalen = unpack('<i', f.read(4))[0]
if datalen != ncol * nrow * 8:
raise _helpers.PVGeoError(
'''Surfer 7 DATA size does not match expected size from
columns and rows. Expected {} but encountered
{}'''.format(
ncol * nrow * 8, datalen
)
)
data = np.zeros(ncol * nrow)
for i in range(ncol * nrow):
data[i] = unpack('<d', f.read(8))[0]
data = np.where(data >= blankval, np.nan, data)
try:
section = unpack('4s', f.read(4))[0]
if section == b'FLTI':
warnings.warn('Unsupported feature: Fault Info')
else:
warnings.warn('Unrecognized keyword: {}'.format(section))
warnings.warn('Remainder of file ignored')
except:
pass
grd = GridInfo(
nx=ncol,
ny=nrow,
xll=x0,
yll=y0,
dx=deltax,
dy=deltay,
dmin=zmin,
dmax=zmax,
data=data,
)
return grd
[docs]
@staticmethod
def _surfer6bin(filename):
"""See class notes."""
with open(filename, 'rb') as f:
if unpack('4s', f.read(4))[0] != b'DSBB':
raise _helpers.PVGeoError(
'''Invalid file identifier for Surfer 6 binary .grd
file. First 4 characters must be DSBB.'''
)
nx = unpack('<h', f.read(2))[0]
ny = unpack('<h', f.read(2))[0]
xlo = unpack('<d', f.read(8))[0]
xhi = unpack('<d', f.read(8))[0]
ylo = unpack('<d', f.read(8))[0]
yhi = unpack('<d', f.read(8))[0]
dmin = unpack('<d', f.read(8))[0]
dmax = unpack('<d', f.read(8))[0]
data = np.ones(nx * ny)
for i in range(nx * ny):
zdata = unpack('<f', f.read(4))[0]
if zdata >= 1.701410009187828e38:
data[i] = np.nan
else:
data[i] = zdata
grd = GridInfo(
nx=nx,
ny=ny,
xll=xlo,
yll=ylo,
dx=(xhi - xlo) / (nx - 1),
dy=(yhi - ylo) / (ny - 1),
dmin=dmin,
dmax=dmax,
data=data,
)
return grd
[docs]
@staticmethod
def _surfer6ascii(filename):
"""See class notes."""
with open(filename, 'r') as f:
if f.readline().strip() != 'DSAA':
raise _helpers.PVGeoError(
'''Invalid file identifier for Surfer 6 ASCII .grd file. First line must be DSAA'''
)
[ncol, nrow] = [int(n) for n in f.readline().split()]
[xmin, xmax] = [float(n) for n in f.readline().split()]
[ymin, ymax] = [float(n) for n in f.readline().split()]
[dmin, dmax] = [float(n) for n in f.readline().split()]
# Read in the rest of the file as a 1D array
data = np.fromiter(
(float(s) for line in f for s in line.split()), dtype=float
)
grd = GridInfo(
nx=ncol,
ny=nrow,
xll=xmin,
yll=ymin,
dx=(xmax - xmin) / (ncol - 1),
dy=(ymax - ymin) / (nrow - 1),
dmin=dmin,
dmax=dmax,
data=data,
)
return grd
[docs]
def _read_grids(self, idx=None):
"""This parses the first file to determine grid file type then reads
all files set."""
if idx is not None:
filenames = [self.get_file_names(idx=idx)]
else:
filenames = self.get_file_names()
contents = []
f = open(filenames[0], 'rb')
key = unpack('4s', f.read(4))[0]
f.close()
if key == b'DSRB':
reader = self._surfer7bin
elif key == b'DSBB':
reader = self._surfer6bin
elif key == b'DSAA':
reader = self._surfer6ascii
else:
raise _helpers.PVGeoError(
'''Invalid file identifier for Surfer .grd file.
First 4 characters must be DSRB, DSBB, or DSAA. This file contains: %s'''
% key
)
for f in filenames:
try:
contents.append(reader(f))
except (IOError, OSError) as fe:
raise _helpers.PVGeoError(str(fe))
if idx is not None:
return contents[0]
return contents
[docs]
def _read_up_front(self):
"""Should not need to be overridden."""
# Perform Read
self.__grids = self._read_grids()
self.need_to_read(flag=False)
return 1
########################
[docs]
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to get data for current timestep and populate the
output data object.
"""
# Get output:
output = self.GetOutputData(outInfo, 0)
if self.need_to_read():
self._read_up_front()
# Get requested time index
i = _helpers.get_requested_time(self, outInfo)
# Build the output
grid = self.__grids[i]
grid.to_vtk(output=output, data_name=self.__data_name)
return 1
[docs]
def set_data_name(self, data_name):
"""Set the name of the data array"""
if self.__data_name != data_name:
self.__data_name = data_name
self.Modified(read_again=False)
[docs]
def get_data_name(self):
"""Get the name of the data array"""
return self.__data_name
###############################################################################
[docs]
class WriteImageDataToSurfer(WriterBase):
"""Write a 2D ``vtkImageData`` object to the Surfer grid format"""
__displayname__ = 'Write ``vtkImageData`` to Surfer Format'
__category__ = 'writer'
def __init__(self):
WriterBase.__init__(self, inputType='vtkImageData', ext='grd')
self.__input_array = [None, None]
[docs]
def apply(self, input_data_object, array_name):
"""Run the algorithm on an input data object, specifying one data array
to save out.
"""
self.SetInputDataObject(input_data_object)
arr, field = _helpers.search_for_array(input_data_object, array_name)
self.SetInputArrayToProcess(0, 0, 0, field, array_name)
self.Update()
return pv.wrap(self.GetOutput())
[docs]
def Write(self, input_data_object=None, array_name=None):
"""Perform the write out."""
if input_data_object:
self.SetInputDataObject(input_data_object)
if array_name:
arr, field = _helpers.search_for_array(input_data_object, array_name)
self.SetInputArrayToProcess(0, 0, 0, field, array_name)
self.Modified()
self.Update()
[docs]
def write(self, input_data_object=None, array_name=None):
"""Perform the write out."""
return self.Write(input_data_object=input_data_object, array_name=array_name)
###############################################################################
[docs]
class EsriGridReader(DelimitedTextReader):
"""See details: https://en.wikipedia.org/wiki/Esri_grid"""
__displayname__ = 'Esri Grid Reader'
__type__ = 'reader'
description = 'PVGeo: Esri Grid'
extensions = 'asc dem txt'
def __init__(self, outputType='vtkImageData', **kwargs):
DelimitedTextReader.__init__(self, outputType=outputType, **kwargs)
# These are attributes the derived from file contents:
self.set_delimiter(' ')
self.__nx = None
self.__ny = None
self.__xo = None
self.__yo = None
self.__cellsize = None
self.__data_name = kwargs.get('data_name', 'Data')
self.NODATA_VALUE = -9999
[docs]
def _file_contents_to_data_frame(self, contents):
"""Creates a dataframe with a single array for the file data."""
data = []
for content in contents:
arr = np.fromiter(
(float(s) for line in content for s in line.split()), dtype=float
)
df = pd.DataFrame(data=arr, columns=[self.get_data_name()])
data.append(df)
return data
[docs]
def _get_raw_data(self, idx=0):
"""This will return the proper data for the given timestep.
This method handles Surfer's NaN data values and checks the value range
"""
data = self._data[idx].values.astype(float).ravel()
nans = np.argwhere(data == self.NODATA_VALUE)
# if np.any(nans):
# data = np.ma.masked_where(nans, data)
data[nans] = np.nan
return np.flip(np.reshape(data, self.get_shape(), order='F'), 1).flatten(
order='F'
)
[docs]
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to get data for current timestep and populate the
output data object.
"""
# Get output:
output = self.GetOutputData(outInfo, 0)
if self.need_to_read():
self._read_up_front()
# Get requested time index
i = _helpers.get_requested_time(self, outInfo)
# Build the data object
output.SetOrigin(self.__xo, self.__yo, 0.0)
output.SetSpacing(self.__cellsize, self.__cellsize, self.__cellsize)
output.SetDimensions(*self.get_shape())
# Now add data values as point data
data = self._get_raw_data(idx=i)
vtkarr = interface.convert_array(data, name=self.__data_name)
output.GetPointData().AddArray(vtkarr)
return 1
[docs]
def set_data_name(self, data_name):
"""Set the name of the data array"""
if self.__data_name != data_name:
self.__data_name = data_name
self.Modified(read_again=False)
[docs]
def get_data_name(self):
"""Get the name of the data array"""
return self.__data_name
[docs]
def get_shape(self):
"""Get the shape of the grid."""
return (self.__nx, self.__ny, 1)
###############################################################################
[docs]
class LandsatReader(ReaderBaseBase):
"""A reader that will handle ESPA XML files for Landsat Imagery. This reader
uses the ``espatools`` package to read Landsat rasters (band sets) and
creates vtkImageData with each band as point data
"""
__displayname__ = 'Landsat XML Reader'
__category__ = 'reader'
extensions = "xml"
description = 'PVGeo: Landsat ESPA XML Metadata'
def __init__(self, **kwargs):
ReaderBaseBase.__init__(self, outputType='vtkImageData', **kwargs)
self.__reader = espatools.RasterSetReader(yflip=True)
self.__raster = None
self.__cast = True
self.__scheme = 'infrared'
# Properties:
self._dataselection = vtk.vtkDataArraySelection()
self._dataselection.AddObserver(
"ModifiedEvent", _helpers.create_modified_callback(self)
)
[docs]
def Modified(self, read_again=False):
"""Ensure default is overridden to be false so array selector can call."""
return ReaderBaseBase.Modified(self, read_again=read_again)
[docs]
def modified(self, read_again=False):
return self.Modified(read_again=read_again)
[docs]
def get_file_name(self):
"""Super class has file names as a list but we will only handle a single
project file. This provides a conveinant way of making sure we only
access that single file.
A user could still access the list of file names using ``get_file_names()``.
"""
return ReaderBaseBase.get_file_names(self, idx=0)
#### Methods for performing the read ####
[docs]
def _get_file_contents(self, idx=None):
"""Reads XML meta data, no data read."""
self.__reader.SetFileName(self.get_file_name())
self.__raster = self.__reader.Read(meta_only=True)
for n in self.__raster.bands.keys():
self._dataselection.AddArray(n)
self.need_to_read(flag=False) # Only meta data has been read
return
[docs]
def _read_up_front(self):
"""Internal helper to read all data at start"""
return self._get_file_contents()
[docs]
def _get_raw_data(self, idx=0):
"""Performs the read for the selected bands"""
allowed = []
for i in range(self._dataselection.GetNumberOfArrays()):
name = self._dataselection.GetArrayName(i)
if self._dataselection.ArrayIsEnabled(name):
allowed.append(name)
self.__raster = self.__reader.Read(
meta_only=False, allowed=allowed, cast=self.__cast
)
return self.__raster
[docs]
def _build_image_data(self, output):
"""Properly builds the output ``vtkImageData`` object"""
if self.__raster is None:
raise _helpers.PVGeoError('Raster invalid.')
raster = self.__raster
output.SetDimensions(raster.nsamps, raster.nlines, 1)
output.SetSpacing(raster.pixel_size.x, raster.pixel_size.y, 1)
corner = raster.global_metadata.projection_information.corner_point[0]
output.SetOrigin(corner.x, corner.y, 0)
return output
#### Pipeline Methods ####
[docs]
def RequestData(self, request, inInfo, outInfo):
"""Used by pipeline to generate output"""
# Get output:
output = vtk.vtkImageData.GetData(outInfo, 0)
# Perform Read if needed
if self.__raster is None:
self._read_up_front()
self._get_raw_data()
self._build_image_data(output)
# Now add the data based on what the user has selected
for name, band in self.__raster.bands.items():
data = band.data
output.GetPointData().AddArray(
interface.convert_array(data.flatten(), name=name)
)
if self.__scheme is not None:
colors = self.__raster.GetRGB(scheme=self.__scheme).reshape((-1, 3))
output.GetPointData().SetScalars(
interface.convert_array(colors, name=self.__scheme)
)
output.GetPointData().SetActiveScalars(self.__scheme)
return 1
#### Seters and Geters for the GUI ####
[docs]
def GetDataSelection(self):
"""Used by ParaView GUI"""
if self.need_to_read():
try:
self._read_up_front()
except:
pass
return self._dataselection
[docs]
def set_cast_data_type(self, flag):
"""A flag to cast all data arrays as floats/doubles.
This will fill invalid values with nans instead of a fill value"""
if self.__cast != flag:
self.__cast = flag
self.Modified()
[docs]
def set_color_scheme(self, scheme):
"""Get an RGB scheme from the raster set. If no scheme is desired, pass
any string that is not a defined scheme as the scheme argument."""
if isinstance(scheme, int):
scheme = self.get_color_scheme_names()[scheme]
if (
scheme in list(espatools.RasterSet.RGB_SCHEMES.keys())
and self.__scheme != scheme
):
self.__scheme = scheme
self.Modified()
elif self.__scheme is not None:
self.__scheme = None
self.Modified()
[docs]
@staticmethod
def get_color_scheme_names():
"""Get a list of the available color schemes"""
schemes = list(espatools.RasterSet.RGB_SCHEMES.keys())
schemes.insert(0, 'No Selection')
return schemes
###############################################################################
[docs]
class WriteCellCenterData(WriterBase):
"""This writer will save a file of the XYZ points for an input dataset's
cell centers and its cell data. Use in tandom with ParaView's native CSV
writer which saves the PointData.
"""
__displayname__ = 'Write Cell Centers To CSV'
__category__ = 'writer'
def __init__(self):
WriterBase.__init__(self, inputType='vtkDataSet')
self.__delimiter = ','
[docs]
def set_delimiter(self, deli):
"""The string delimiter to use"""
if self.__delimiter != deli:
self.__delimiter = deli
self.Modified()
###############################################################################