Source code for datafusiontools.interpolation.interpolation

import numpy as np
from scipy.spatial import cKDTree, Delaunay
from scipy import interpolate
import pykrige
from dataclasses import dataclass, asdict
from typing import List, Union, Dict, Optional
from abc import abstractmethod
import numpy as np
from enum import Enum
from datafusiontools._core.base_class import BaseClass


[docs]class VariogramModel(Enum): linear = "linear" power = "power" gaussian = "gaussian" spherical = "spherical" exponential = "exponential" hole_effect = "hole-effect"
[docs]@dataclass class VariogramParameters: slope: Optional[float] = None nugget: Optional[float] = None scale: Optional[float] = None exponent: Optional[float] = None nugget: Optional[float] = None sill: Optional[float] = None psill: Optional[float] = None range: Optional[float] = None
[docs]@dataclass class BaseClassInterpolation(BaseClass): tree: Union[List, None, np.array] = None zn: Union[List, None, np.array] = None training_data: Union[List, None, np.array] = None training_points: Union[List, None, np.array] = None
[docs] @abstractmethod def interpolate(self): raise NotImplementedError( "The method should be implemented in concrete classes." )
[docs] @abstractmethod def predict(self): raise NotImplementedError( "The method should be implemented in concrete classes." )
[docs]@dataclass class Nearest(BaseClassInterpolation):
[docs] def interpolate(self, training_points: np.array, training_data: np.array): """ Define the KDtree This interpolation is done with `SciPy interpolate.NearestNDInterpolator <https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.NearestNDInterpolator.html>`_. :param training_points: array with the training points :param training_data: data at the training points :return: """ # assign to variables self.training_points = training_points # training points self.training_data = training_data # data at the training points # define interpolation function self.interpolating_function = interpolate.NearestNDInterpolator( self.training_points, self.training_data ) # create KDtree self.tree = cKDTree(training_points) return
[docs] def predict(self, prediction_points): """ Perform interpolation with nearest neighbors method :param prediction_points: prediction points :return: """ # compute closest distance and index of the closest index dist, idx = self.tree.query(prediction_points) self.zn = [] # create interpolation for every point for i in range(len(prediction_points)): # interpolate self.zn.append(self.training_data[idx[i]]) self.zn = np.array(self.zn) return
[docs]@dataclass class InverseDistance(BaseClassInterpolation): """ Inverse distance interpolator """ nb_near_points: int = 6 power: float = 1.0 tol: float = 1e-9 var: Union[List, None, np.array] = None
[docs] def interpolate(self, training_points, training_data): """ Define the KDtree :param training_points: array with the training points :param training_data: data at the training points :return: """ # assign to variables self.training_points = np.array(training_points) # training points self.training_data = np.array(training_data) # data at the training points # compute Euclidean distance from grid to training self.tree = cKDTree(self.training_points) return
[docs] def predict(self, prediction_points): """ Perform interpolation with inverse distance method :param prediction_points: prediction points :return: """ # get distances and indexes of the closest nb_points dist, idx = self.tree.query(prediction_points, self.nb_near_points) dist += self.tol # to overcome division by zero self.zn = [] # create interpolation for every point for i in range(len(prediction_points)): # compute weights data = self.training_data[idx[i]] # interpolate self.zn.append( np.sum(data.T / dist[i] ** self.power) / np.sum(1.0 / dist[i] ** self.power) ) self.zn = np.array(self.zn) return
[docs]@dataclass class NaturalNeighbor(BaseClassInterpolation): interp: Union[List, None, np.array] = None
[docs] def interpolate(self, training_points, training_data): """ Define the interpolator This interpolation is done with `SciPy interpolate.NearestNDInterpolator <https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.NearestNDInterpolator.html>`_. :param training_points: array with the training points :param training_data: data at the training points :return: """ # assign to variables self.training_points = training_points # training points self.training_data = training_data # data at the training points return
[docs] def predict(self, prediction_points): """ Perform interpolation with natural neighbors method :param prediction_points: prediction points :return: """ zn = [] prediction_points = np.array(prediction_points) for i in range(len(prediction_points)): new_points = np.vstack([self.training_points, prediction_points[i].T]) tri = Delaunay(new_points) # Find index of prediction point pindex = np.where(np.all(tri.points == prediction_points[i].T, axis=1))[0][ 0 ] # find neighbours neig_idx = tri.vertex_neighbor_vertices[1][ tri.vertex_neighbor_vertices[0][pindex] : tri.vertex_neighbor_vertices[ 0 ][pindex + 1] ] # get the coordinates of the neighbours coords_neig = [tri.points[j] for j in neig_idx] # compute Euclidean distance dist = [np.linalg.norm(prediction_points[i] - j) for j in coords_neig] # find data of the neighbours idx_coords_neig = [] for j in coords_neig: idx_coords_neig.append( np.where(np.all(self.training_points == j, axis=1))[0][0] ) # get data of the neighbours data_neig = [np.array(self.training_data[j]) for j in idx_coords_neig] # compute weights zn_aux = [] for ii in range(len(data_neig)): aux = data_neig[ii] * dist[ii] / np.sum(dist) zn_aux.append(aux) zn.append(np.sum(np.array(zn_aux), axis=0)) self.zn = zn return
[docs]@dataclass class CustomKriging(BaseClassInterpolation): two_d: bool = True variogram_model: VariogramModel = VariogramModel.linear variogram_parameters: VariogramParameters = VariogramParameters()
[docs] def interpolate(self, training_points, training_data): # assign to variables if self.two_d: self.training_points = training_points # training points self.training_data = training_data # data at the training points self.interpolating_function = pykrige.ok.OrdinaryKriging( self.training_points.T[0], self.training_points.T[1], training_data, variogram_model=self.variogram_model.value, variogram_parameters={ k: v for k, v in asdict(self.variogram_parameters).items() if v is not None }, verbose=False, enable_plotting=False, nlags=20, ) else: self.training_points = training_points # training points self.training_data = training_data # data at the training points self.interpolating_function = pykrige.ok3d.OrdinaryKriging3D( self.training_points.T[0], self.training_points.T[1], self.training_points.T[2], training_data, variogram_model=self.variogram_model.value, variogram_parameters={ k: v for k, v in asdict(self.variogram_parameters).items() if v is not None }, verbose=False, enable_plotting=False, nlags=20, )
[docs] def predict(self, prediction_points): """ Perform interpolation with Kriging method :param prediction_points: prediction points :return: """ if self.two_d: self.zn, ss = self.interpolating_function.execute( "points", prediction_points.T[0], prediction_points.T[1] ) else: self.zn, ss = self.interpolating_function.execute( "points", prediction_points.T[0], prediction_points.T[1], prediction_points.T[2], )