Source code for

import re
import sys
import warnings
from typing import Union, List

import numpy as np
import pandas as pn
import seaborn as sns

    import ipywidgets as widgets

    ipywidgets_import = True
except ModuleNotFoundError:
    VTK_IMPORT = False

# This is for sphenix to find the packages
from gempy.core.grid_modules import grid_types
from gempy.core.grid_modules import topography
from gempy.utils.meta import _setdoc, _setdoc_pro
import gempy.utils.docstring as ds
from IPython.display import display

pn.options.mode.chained_assignment = None

[docs]class MetaData(object): """Class containing metadata of the project. Set of attributes and methods that are not related directly with the geological model but more with the project Args: project_name (str): Name of the project. This is use as default value for some I/O actions Attributes: date (str): Time of the creations of the project project_name (str): Name of the project. This is use as default value for some I/O actions """
[docs] def __init__(self, project_name='default_project'): import datetime now = = now.strftime(" %Y-%m-%d %H:%M") if project_name == 'default_project': project_name += self.project_name = project_name
[docs]@_setdoc_pro([grid_types.RegularGrid.__doc__, grid_types.CustomGrid.__doc__]) class Grid(object): """ Class to generate grids. This class is used to create points where to evaluate the geological model. This class serves a container which transmit the XYZ coordinates to the interpolator. There are several type of grids objects will feed into the Grid class Args: **kwargs: See below Keyword Args: regular (:class:`gempy.core.grid_modules.grid_types.RegularGrid`): [s0] custom (:class:`gempy.core.grid_modules.grid_types.CustomGrid`): [s1] topography (:class:`gempy.core.grid_modules.grid_types.Topography`): [s2] sections (:class:`gempy.core.grid_modules.grid_types.Sections`): [s3] gravity (:class:`gempy.core.grid_modules.grid_types.Gravity`): Attributes: values (np.ndarray): coordinates where the model is going to be evaluated. This are the coordinates concatenation of all active grids. values_r (np.ndarray): rescaled coordinates where the model is going to be evaluated length (np.ndarray):I a array which contain the slicing index for each grid type in order. The first element will be 0, the second the length of the regular grid; the third custom and so on. This can be used to slice the solutions correspondent to each of the grids grid_types(np.ndarray[str]): names of the current grids of GemPy active_grids(np.ndarray[bool]): boolean array which control which type of grid is going to be computed and hence on the property `values`. regular_grid (:class:`gempy.core.grid_modules.grid_types.RegularGrid`) custom_grid (:class:`gempy.core.grid_modules.grid_types.CustomGrid`) topography (:class:`gempy.core.grid_modules.grid_types.Topography`) sections (:class:`gempy.core.grid_modules.grid_types.Sections`) gravity_grid (:class:`gempy.core.grid_modules.grid_types.Gravity`) """
[docs] def __init__(self, **kwargs): self.values = np.empty((0, 3)) self.values_r = np.empty((0, 3)) self.length = np.empty(0) self.grid_types = np.array(['regular', 'custom', 'topography', 'sections', 'centered']) self.active_grids = np.zeros(5, dtype=bool) # All grid types must have values # Init optional grids self.custom_grid = None self.custom_grid_grid_active = False self.topography = None self.topography_grid_active = False self.sections_grid_active = False self.centered_grid = None self.centered_grid_active = False # Init basic grid empty self.regular_grid = self.create_regular_grid(set_active=False, **kwargs) self.regular_grid_active = False # Init optional sections self.sections = grid_types.Sections(regular_grid=self.regular_grid) self.update_grid_values()
def __str__(self): return 'Grid Object. Values: \n' + np.array2string(self.values) def __repr__(self): return 'Grid Object. Values: \n' + np.array_repr(self.values)
[docs] @_setdoc(grid_types.RegularGrid.__doc__) def create_regular_grid(self, extent=None, resolution=None, set_active=True, *args, **kwargs): """ Set a new regular grid and activate it. Args: extent (np.ndarray): [x_min, x_max, y_min, y_max, z_min, z_max] resolution (np.ndarray): [nx, ny, nz] RegularGrid Docs """ self.regular_grid = grid_types.RegularGrid(extent, resolution, **kwargs) if set_active is True: self.set_active('regular') return self.regular_grid
[docs] @_setdoc_pro(ds.coord) def create_custom_grid(self, custom_grid: np.ndarray): """ Set a new regular grid and activate it. Args: custom_grid (np.array): [s0] """ self.custom_grid = grid_types.CustomGrid(custom_grid) self.set_active('custom')
[docs] def create_topography(self, source='random', **kwargs): """Create a topography grid and activate it. Args: source: * 'gdal': Load topography from a raster file. * 'random': Generate random topography (based on a fractal grid). * 'saved': Load topography that was saved with the function. This is useful after loading and saving a heavy raster file with gdal once or after saving a random topography with the save() function. This .npy file can then be set as topography. Keyword Args: source = 'gdal': * filepath: path to raster file, e.g. '.tif', (for all file formats see source = 'random': * fd: fractal dimension, defaults to 2.0 * d_z: maximum height difference. If none, last 20% of the model in z direction * extent: extent in xy direction. If none, geo_model.grid.extent * resolution: desired resolution of the topography array. If none, geo_model.grid.resoution source = 'saved': * filepath: path to the .npy file that was created using the function Returns: """ self.topography = topography.Topography(self.regular_grid) if source == 'random': self.topography.load_random_hills(**kwargs) elif source == 'gdal': filepath = kwargs.get('filepath', None) if filepath is not None: self.topography.load_from_gdal(filepath) else: print('to load a raster file, a path to the file must be provided') elif source == 'saved': filepath = kwargs.get('filepath', None) if filepath is not None: self.topography.load_from_saved(filepath) else: print('path to .npy file must be provided') elif source == 'numpy': array = kwargs.get('array', None) self.topography.set_values(array) else: raise AttributeError('source must be random, gdal or saved') self.set_active('topography')
[docs] @_setdoc(grid_types.Sections.__doc__) def create_section_grid(self, section_dict): self.sections = grid_types.Sections(regular_grid=self.regular_grid, section_dict=section_dict) self.set_active('sections') return self.sections
[docs] @_setdoc(grid_types.CenteredGrid.set_centered_grid.__doc__) def create_centered_grid(self, centers, radius, resolution=None): """Initialize gravity grid. Deactivate the rest of the grids""" self.centered_grid = grid_types.CenteredGrid(centers, radius, resolution) # self.active_grids = np.zeros(4, dtype=bool) self.set_active('centered')
[docs] def deactivate_all_grids(self): """ Deactivates the active grids array :return: """ self.active_grids = np.zeros(5, dtype=bool) self.update_grid_values() return self.active_grids
[docs] def set_active(self, grid_name: Union[str, np.ndarray]): """ Set active a given or several grids Args: grid_name (str, list): """ where = self.grid_types == grid_name self.active_grids[where] = True self.update_grid_values() return self.active_grids
def set_inactive(self, grid_name: str): where = self.grid_types == grid_name self.active_grids *= ~where self.update_grid_values() return self.active_grids
[docs] def update_grid_values(self): """ Copy XYZ coordinates from each specific grid to Grid.values for those which are active. Returns: values """ self.length = np.empty(0) self.values = np.empty((0, 3)) lengths = [0] try: for e, grid_types in enumerate( [self.regular_grid, self.custom_grid, self.topography, self.sections, self.centered_grid]): if self.active_grids[e]: self.values = np.vstack((self.values, grid_types.values)) lengths.append(grid_types.values.shape[0]) else: lengths.append(0) except AttributeError: raise AttributeError('Grid type does not exist yet. Set the grid before activating it.') self.length = np.array(lengths).cumsum() return self.values
def get_grid_args(self, grid_name: str): assert type(grid_name) is str, 'Only one grid type can be retrieved' assert grid_name in self.grid_types, 'possible grid types are ' + str(self.grid_types) where = np.where(self.grid_types == grid_name)[0][0] return self.length[where], self.length[where + 1] def get_grid(self, grid_name: str): assert type(grid_name) is str, 'Only one grid type can be retrieved' l_0, l_1 = self.get_grid_args(grid_name) return self.values[l_0:l_1] def get_section_args(self, section_name: str): # assert type(section_name) is str, 'Only one section type can be retrieved' l0, l1 = self.get_grid_args('sections') where = np.where(self.sections.names == section_name)[0][0] return l0 + self.sections.length[where], l0 + self.sections.length[where + 1]
class Colors: """ Object that handles the color management in the model. """ def __init__(self, surfaces): self.surfaces = surfaces self.colordict = None self._hexcolors_soft = [ '#015482', '#9f0052', '#ffbe00', '#728f02', '#443988', '#ff3f20', '#5DA629', '#b271d0', '#72e54a', '#583bd1', '#d0e63d', '#b949e2', '#95ce4b', '#6d2b9f', '#60eb91', '#d746be', '#52a22e', '#5e63d8', '#e5c339', '#371970', '#d3dc76', '#4d478e', '#43b665', '#d14897', '#59e5b8', '#e5421d', '#62dedb', '#df344e', '#9ce4a9', '#d94077', '#99c573', '#842f74', '#578131', '#708de7', '#df872f', '#5a73b1', '#ab912b', '#321f4d', '#e4bd7c', '#142932', '#cd4f30', '#69aedd', '#892a23', '#aad6de', '#5c1a34', '#cfddb4', '#381d29', '#5da37c', '#d8676e', '#52a2a3', '#9b405c', '#346542', '#de91c9', '#555719', '#bbaed6', '#945624', '#517c91', '#de8a68', '#3c4b64', '#9d8a4d', '#825f7e', '#2c3821', '#ddadaa', '#5e3524', '#a3a68e', '#a2706b', '#686d56' ] # source: def generate_colordict( self, hex_colors: Union[List[str], str] = 'palettes', palettes: List[str] = 'default', ): """Generates and sets color dictionary. Args: hex_colors (list[str], str): List of hex color values. In the future this could accommodate the actual geological palettes. For example striplog has a quite good set of palettes. * palettes: If hexcolors='palettes' the colors will be chosen from the palettes arg * soft: palettes (list[str], optional): list with name of seaborn palettes. Defaults to 'default'. """ if hex_colors == 'palettes': hex_colors = [] if palettes == 'default': # we predefine some 7 colors manually hex_colors = ['#015482', '#9f0052', '#ffbe00', '#728f02', '#443988', '#ff3f20', '#5DA629'] # then we create a list of seaborn color palette names, as the user didn't provide any palettes = ['muted', 'pastel', 'deep', 'bright', 'dark', 'colorblind'] for palette in palettes: # for each palette hex_colors += sns.color_palette(palette).as_hex() # get all colors in palette and add to list if len(hex_colors) >= len(self.surfaces.df): break elif hex_colors == 'soft': hex_colors = self._hexcolors_soft surface_names = self.surfaces.df['surface'].values n_surfaces = len(surface_names) while n_surfaces > len(hex_colors): hex_colors.append(self._random_hexcolor()) self.colordict = dict( zip(surface_names, hex_colors[:n_surfaces]) ) @staticmethod def _random_hexcolor() -> str: """Generates a random hex color string.""" return "#"+str(hex(np.random.randint(0, 16777215))).lstrip("0x") def change_colors(self, colordict: dict = None): """Change the model colors either by providing a color dictionary or, if not, by using a color pick widget. Args: colordict (dict, optional): dict with surface names mapped to hex color codes, e.g. {'layer1':'#6b0318'} if None: opens jupyter widget to change colors interactively. Defaults to None. """ assert ipywidgets_import, 'ipywidgets not imported. Make sure the library is installed.' if colordict: self.update_colors(colordict) else: items = [ widgets.ColorPicker(description=surface, value=color) for surface, color in self.colordict.items() ] colbox = widgets.VBox(items) print('Click to select new colors.') display(colbox) def on_change(v): self.colordict[v['owner'].description] = v['new'] # update colordict self._set_colors() for cols in colbox.children: cols.observe(on_change, 'value') def update_colors(self, colordict: dict = None): """ Updates the colors in self.colordict and in surfaces_df. Args: colordict (dict, optional): dict with surface names mapped to hex color codes, e.g. {'layer1':'#6b0318'}. Defaults to None. """ if colordict is None: self.generate_colordict() else: for surf, color in colordict.items(): # map new colors to surfaces # assert this because user can set it manually assert surf in list(self.surfaces.df['surface']), str(surf) + ' is not a model surface' assert'^#(?:[0-9a-fA-F]{3}){1,2}$', color), str(color) + ' is not a HEX color code' self.colordict[surf] = color self._set_colors() def _add_colors(self): """Assign a color to the last entry of surfaces df or check isnull and assign color there""" self.generate_colordict() def _set_colors(self): """sets colordict in surfaces dataframe""" for surf, color in self.colordict.items(): self.surfaces.df.loc[self.surfaces.df['surface'] == surf, 'color'] = color def set_default_colors(self, surfaces=None): if surfaces is not None: self.colordict[surfaces] = self.colordict[surfaces] self._set_colors() def delete_colors(self, surfaces): for surface in surfaces: self.colordict.pop(surface, None) self._set_colors() def make_faults_black(self, series_fault): faults_list = list(self.surfaces.df[self.surfaces.df.series.isin(series_fault)]['surface']) for fault in faults_list: if self.colordict[fault] == '#527682': self.set_default_colors(fault) else: self.colordict[fault] = '#527682' self._set_colors() def reset_default_colors(self): self.generate_colordict() self._set_colors() return self.surfaces # @_setdoc_pro(Series.__doc__)
[docs]class Surfaces(object): """ Class that contains the surfaces of the model and the values of each of them. Args: surface_names (list or np.ndarray): list containing the names of the surfaces series (:class:`Series`): [s0] values_array (np.ndarray): 2D array with the values of each surface properties names (list or np.ndarray): list containing the names of each properties Attributes: df (:class:`pn.core.frame.DataFrames`): Pandas data frame containing the surfaces names mapped to series and the value used for each voxel in the final model. series (:class:`Series`) colors (:class:`Colors`) """
[docs] def __init__(self, series, surface_names=None, values_array=None, properties_names=None): self._columns = ['surface', 'series', 'order_surfaces', 'isBasement', 'isFault', 'isActive', 'hasData', 'color', 'vertices', 'edges', 'sfai', 'id'] self._columns_vis_drop = ['vertices', 'edges', 'sfai', 'isBasement', 'isFault', 'isActive', 'hasData'] self._n_properties = len(self._columns) - 1 self.series = series self.colors = Colors(self) df_ = pn.DataFrame(columns=self._columns) self.df = df_.astype({'surface': str, 'series': 'category', 'order_surfaces': int, 'isBasement': bool, 'isFault': bool, 'isActive': bool, 'hasData': bool, 'color': bool, 'id': int, 'vertices': object, 'edges': object}) if (np.array(sys.version_info[:2]) <= np.array([3, 6])).all(): self.df: pn.DataFrame self.df['series'].cat.add_categories(['Default series'], inplace=True) if surface_names is not None: self.set_surfaces_names(surface_names) if values_array is not None: self.set_surfaces_values(values_array=values_array, properties_names=properties_names)
def __repr__(self): c_ = self.df.columns[~(self.df.columns.isin(self._columns_vis_drop))] return self.df[c_].to_string() def _repr_html_(self): c_ = self.df.columns[~(self.df.columns.isin(self._columns_vis_drop))] return self.df[c_].style.applymap(self.background_color, subset=['color']).render() @property def properties_val(self): all_col = self.df.columns prop_cols = all_col.drop(self._columns) return prop_cols.insert(0, 'id') @property def basement(self): return self.df['surface'][self.df['isBasement']]
[docs] def update_id(self, id_list: list = None): """ Set id of the layers (1 based) Args: id_list (list): Returns: :class:`Surfaces`: """ self.map_faults() if id_list is None: # This id is necessary for the faults id_unique = self.df.reset_index().index + 1 self.df['id'] = id_unique return self
def map_faults(self): self.df['isFault'] = self.df['series'].map(self.series.faults.df['isFault']) @staticmethod def background_color(value): if isinstance(value, str): return "background-color: %s" % value # region set formation names
[docs] def set_surfaces_names(self, surfaces_list: list, update_df=True): """ Method to set the names of the surfaces in order. This applies in the surface column of the df Args: surfaces_list (list[str]): list of names of surfaces. They are ordered. update_df (bool): Update Surfaces.df columns with the default values Returns: :class:`Surfaces`: """ if isinstance(surfaces_list, (list, np.ndarray)): surfaces_list = np.asarray(surfaces_list) else: raise AttributeError('list_names must be either array_like type') # Deleting all columns if they exist # TODO check if some of the names are in the df and not deleting them? self.df.drop(self.df.index, inplace=True) self.df['surface'] = surfaces_list # Changing the name of the series is the only way to mutate the series object from surfaces if update_df is True: self.map_series() self.update_id() self.set_basement() self.reset_order_surfaces() self.colors.update_colors() return self
[docs] def set_default_surface_name(self): """ Set the minimum number of surfaces to compute a model i.e. surfaces_names: surface1 and basement Returns: :class:`Surfaces`: """ if self.df.shape[0] == 0: # TODO DEBUG: I am not sure that surfaces always has at least one entry. Check it self.set_surfaces_names(['surface1', 'basement']) return self
[docs] def set_surfaces_names_from_surface_points(self, surface_points): """ Set surfaces names from a :class:`Surface_points` object. This can be useful if the surface points are imported from a table. Args: surface_points (:class:`Surface_points`): Returns: """ self.set_surfaces_names(surface_points.df['surface'].unique()) return self
[docs] def add_surface(self, surface_list: Union[str, list], update_df=True): """ Add surface to the df. Args: surface_list (str, list): name or list of names of the surfaces to apply the functionality update_df (bool): Update Surfaces.df columns with the default values Returns: :class:`` """ surface_list = np.atleast_1d(surface_list) # Remove from the list categories that already exist surface_list = surface_list[~np.in1d(surface_list, self.df['surface'].values)] for c in surface_list: idx = self.df.index.max() if idx is np.nan: idx = -1 self.df.loc[idx + 1, 'surface'] = c if update_df is True: self.map_series() self.update_id() self.set_basement() self.reset_order_surfaces() self.colors.update_colors() return self
[docs] @_setdoc_pro([update_id.__doc__, pn.DataFrame.drop.__doc__]) def delete_surface(self, indices: Union[int, str, list, np.ndarray], update_id=True): """[s1] Args: indices (str, list): name or list of names of the series to apply the functionality update_id (bool): if true [s0] Returns: :class:`Surfaces`: """ indices = np.atleast_1d(indices) if indices.dtype == int: self.df.drop(indices, inplace=True) else: self.df.drop(self.df.index[self.df['surface'].isin(indices)], inplace=True) if update_id is True: self.update_id() self.set_basement() self.reset_order_surfaces() return self
[docs] def rename_surfaces(self, to_replace: Union[str, list, dict], **kwargs): """Replace values given in to_replace with value. Args: to_replace (str, regex, list, dict, Series, int, float, or None) – How to find the values that will be replaced. **kwargs: Returns: :class:`` See Also: :any:`pandas.Series.replace` """ if np.isin(to_replace, self.df['surface']).any(): print('Two surfaces cannot have the same name.') else: self.df['surface'].replace(to_replace, inplace=True, **kwargs) return self
def reset_order_surfaces(self): self.df['order_surfaces'] = self.df.groupby('series').cumcount() + 1
[docs] def modify_order_surfaces(self, new_value: int, idx: int, series_name: str = None): """ Replace to the new location the old series Args: new_value (int): New location idx (int): Index of the surface to be moved series_name (str): name of the series to be moved Returns: :class:`` """ if series_name is None: series_name = self.df.loc[idx, 'series'] group = self.df.groupby('series').get_group(series_name)['order_surfaces'] assert np.isin(new_value, group), 'new_value must exist already in the order_surfaces group.' old_value = group[idx] self.df.loc[group.index.astype('int'), 'order_surfaces'] = group.replace([new_value, old_value], [old_value, new_value]) self.sort_surfaces() self.set_basement() return self
[docs] def sort_surfaces(self): """Sort surfaces by series and order_surfaces""" self.df.sort_values(by=['series', 'order_surfaces'], inplace=True) self.update_id() return self.df
[docs] def set_basement(self): """ Set isBasement property to true to the last series of the df. Returns: :class:`Surfaces`: """ self.df['isBasement'] = False idx = self.df.last_valid_index() if idx is not None: self.df.loc[idx, 'isBasement'] = True # TODO add functionality of passing the basement and calling reorder to push basement surface to the bottom # of the data frame assert self.df['isBasement'].values.astype(bool).sum() <= 1, 'Only one surface can be basement' return self
# endregion # set_series
[docs] def map_series(self, mapping_object: Union[dict, pn.DataFrame] = None): """ Method to map to which series every surface belongs to. This step is necessary to assign differenct tectonics such as unconformities or faults. Args: mapping_object (dict, :class:`pn.DataFrame`): * dict: keys are the series and values the surfaces belonging to that series * pn.DataFrame: Dataframe with surfaces as index and a column series with the correspondent series name of each surface Returns: :class:`Surfaces` """ # Updating surfaces['series'] categories self.df['series'].cat.set_categories(self.series.df.index, inplace=True) # TODO Fixing this. It is overriding the formations already mapped if mapping_object is not None: # If none is passed and series exist we will take the name of the first series as a default if type(mapping_object) is dict: s = [] f = [] for k, v in mapping_object.items(): for form in np.atleast_1d(v): s.append(k) f.append(form) new_series_mapping = pn.DataFrame([pn.Categorical(s, self.series.df.index)], f, columns=['series']) elif isinstance(mapping_object, pn.Categorical): # This condition is for the case we have surface on the index and in 'series' the category # TODO Test this new_series_mapping = mapping_object else: raise AttributeError(str(type(mapping_object)) + ' is not the right attribute type.') # Checking which surfaces are on the list to be mapped b = self.df['surface'].isin(new_series_mapping.index) idx = self.df.index[b] # Mapping self.df.loc[idx, 'series'] = self.df.loc[idx, 'surface'].map(new_series_mapping['series']) # Fill nans self.df['series'].fillna(self.series.df.index.values[-1], inplace=True) # Reorganize the pile self.reset_order_surfaces() self.sort_surfaces() self.set_basement() return self
# endregion # region update_id # endregion
[docs] def add_surfaces_values(self, values_array: Union[np.ndarray, list], properties_names: list = np.empty(0)): """Add values to be interpolated for each surfaces. Args: values_array (np.ndarray, list): array-like of the same length as number of surfaces. This functionality can be used to assign different geophysical properties to each layer properties_names (list): list of names for each values_array columns. This must be of same size as values_array axis 1. By default properties will take the column name: 'value_X'. Returns: :class:`` """ values_array = np.atleast_2d(values_array) properties_names = np.atleast_1d(properties_names) if properties_names.shape[0] != values_array.shape[0]: for i in range(values_array.shape[0]): properties_names = np.append(properties_names, 'value_' + str(i)) for e, p_name in enumerate(properties_names): try: self.df.loc[:, p_name] = values_array[e] except ValueError: raise ValueError('value_array must have the same length in axis 0 as the number of surfaces') return self
[docs] def delete_surface_values(self, properties_names: Union[str, list]): """Delete a property or several properties column. Args: properties_names (str, list[str]): Name of the property to delete Returns: :class:`` """ properties_names = np.asarray(properties_names) self.df.drop(properties_names, axis=1, inplace=True) return True
[docs] def set_surfaces_values(self, values_array: Union[np.ndarray, list], properties_names: list = np.empty(0)): """Set values to be interpolated for each surfaces. This method will delete the previous values. Args: values_array (np.ndarray, list): array-like of the same length as number of surfaces. This functionality can be used to assign different geophysical properties to each layer properties_names (list): list of names for each values_array columns. This must be of same size as values_array axis 1. By default properties will take the column name: 'value_X'. Returns: :class:`` """ # Check if there are values columns already old_prop_names = self.df.columns[~self.df.columns.isin(['surface', 'series', 'order_surfaces', 'id', 'isBasement', 'color'])] # Delete old self.delete_surface_values(old_prop_names) # Create new self.add_surfaces_values(values_array, properties_names) return self
[docs] def modify_surface_values(self, idx, properties_names, values): """Method to modify values using loc of pandas. Args: idx (int, list[int]): properties_names (str, list[str]): values (float, np.ndarray): Returns: :class:`` """ properties_names = np.atleast_1d(properties_names) assert ~np.isin(properties_names, ['surface', 'series', 'order_surfaces', 'id', 'isBasement', 'color']), \ 'only property names can be modified with this method' self.df.loc[idx, properties_names] = values return self
# @_setdoc_pro([SurfacePoints.__doc__, Orientations.__doc__, Surfaces.__doc__, Faults.__doc__])
[docs]class Structure(object): """ The structure_data class analyse the different lengths of subset in the interface and orientations categories_df to pass them to the theano function. Attributes: surface_points (:class:`SurfacePoints`): [s0] orientations (:class:`Orientations`): [s1] surfaces (:class:`Surfaces`): [s2] faults (:class:`Faults`): [s3] df (:class:`pn.DataFrame`): * len surfaces surface_points (list): length of each surface/fault in surface_points * len series surface_points (list) : length of each series in surface_points * len series orientations (list) : length of each series in orientations * number surfaces per series (list): number of surfaces per series * ... Args: surface_points (:class:`SurfacePoints`): [s0] orientations (:class:`Orientations`): [s1] surfaces (:class:`Surfaces`): [s2] faults (:class:`Faults`): [s3] """
[docs] def __init__(self, surface_points, orientations, surfaces: Surfaces, faults): self.surface_points = surface_points self.orientations = orientations self.surfaces = surfaces self.faults = faults df_ = pn.DataFrame(np.array(['False', 'False', -1, -1, -1, -1, -1, -1, -1], ).reshape(1, -1), index=['values'], columns=['isLith', 'isFault', 'number faults', 'number surfaces', 'number series', 'number surfaces per series', 'len surfaces surface_points', 'len series surface_points', 'len series orientations']) self.df = df_.astype({'isLith': bool, 'isFault': bool, 'number faults': int, 'number surfaces': int, 'number series': int}) self.update_structure_from_input()
def __repr__(self): return self.df.T.to_string() def _repr_html_(self): return self.df.T.to_html()
[docs] def update_structure_from_input(self): """ Update all fields dependent on the linked Data objects. Returns: bool: True """ self.set_length_surfaces_i() self.set_series_and_length_series_i() self.set_length_series_o() self.set_number_of_surfaces_per_series() self.set_number_of_faults() self.set_number_of_surfaces() self.set_is_lith_is_fault() return True
[docs] def set_length_surfaces_i(self): """ Set the length of each **surface** on `SurfacePoints` i.e. how many data points are for each surface Returns: :class:`pn.DataFrame`: df where Structural data is stored """ # ================== # Extracting lengths # ================== # Array containing the size of every surface. SurfacePoints lssp = self.surface_points.df.groupby('id')['order_series'].count().values lssp_nonzero = lssp[np.nonzero(lssp)]['values', 'len surfaces surface_points'] = lssp_nonzero return self.df
[docs] def set_series_and_length_series_i(self): """ Set the length of each **series** on `SurfacePoints` i.e. how many data points are for each series. Also sets the number of series itself. Returns: :class:`pn.DataFrame`: df where Structural data is stored """ len_series = self.surfaces.series.df.shape[0] # Array containing the size of every series. SurfacePoints. points_count = self.surface_points.df['order_series'].value_counts(sort=False) len_series_i = np.zeros(len_series, dtype=int) len_series_i[points_count.index.astype('int') - 1] = points_count.values if len_series_i.shape[0] == 0: len_series_i = np.insert(len_series_i, 0, 0)['values', 'len series surface_points'] = len_series_i self.df['number series'] = len(len_series_i) return self.df
[docs] def set_length_series_o(self): """ Set the length of each **series** on `Orientations` i.e. how many orientations are for each series. Returns: :class:`pn.DataFrame`: df where Structural data is stored """ # Array containing the size of every series. orientations. len_series_o = np.zeros(self.surfaces.series.df.shape[0], dtype=int) ori_count = self.orientations.df['order_series'].value_counts(sort=False) len_series_o[ori_count.index.astype('int') - 1] = ori_count.values['values', 'len series orientations'] = len_series_o return self.df
[docs] def set_number_of_surfaces_per_series(self): """ Set number of surfaces for each series Returns: :class:`pn.DataFrame`: df where Structural data is stored """ len_sps = np.zeros(self.surfaces.series.df.shape[0], dtype=int) surf_count = self.surface_points.df.groupby('order_series'). \ surface.nunique() len_sps[surf_count.index.astype('int') - 1] = surf_count.values['values', 'number surfaces per series'] = len_sps return self.df
[docs] def set_number_of_faults(self): """ Set number of faults series. This method in gempy v2 is simply informative Returns: :class:`pn.DataFrame`: df where Structural data is stored """ # Number of faults existing in the surface_points df['values', 'number faults'] = self.faults.df['isFault'].sum() return self.df
[docs] def set_number_of_surfaces(self): """ Set the number of total surfaces Returns: :class:`pn.DataFrame`: df where Structural data is stored """ # Number of surfaces existing in the surface_points df['values', 'number surfaces'] = self.surface_points.df['surface'].nunique() return self.df
[docs] def set_is_lith_is_fault(self): """ Check if there is lithologies in the data and/or df. This method in gempy v2 is simply informative Returns: :class:`pn.DataFrame`: df where Structural data is stored """ self.df['isLith'] = True if self.df.loc['values', 'number series'] >= self.df.loc['values', 'number faults'] \ else False self.df['isFault'] = True if self.df.loc['values', 'number faults'] > 0 else False return self.df
[docs]class Options(object): """The class options contains the auxiliary user editable flags mainly independent to the model. Attributes: df (:class:`pn.DataFrame`): df containing the flags. All fields are pandas categories allowing the user to change among those categories. """
[docs] def __init__(self): df_ = pn.DataFrame(np.array(['float32', 'geology', 'fast_compile', 'cpu', None]).reshape(1, -1), index=['values'], columns=['dtype', 'output', 'theano_optimizer', 'device', 'verbosity']) self.df = df_.astype({'dtype': 'category', 'output': 'category', 'theano_optimizer': 'category', 'device': 'category', 'verbosity': object}) self.df['dtype'].cat.set_categories(['float32', 'float64'], inplace=True) self.df['theano_optimizer'].cat.set_categories(['fast_run', 'fast_compile'], inplace=True) self.df['device'].cat.set_categories(['cpu', 'cuda'], inplace=True) self.default_options()
def __repr__(self): return self.df.T.to_string() def _repr_html_(self): return self.df.T.to_html()
[docs] def modify_options(self, attribute, value): """Method to modify a given field Args: attribute (str): Name of the field to modify value: new value of the field. It will have to exist in the category in order for pandas to modify it. Returns: :class:`pandas.DataFrame`: df where options data is stored """ assert np.isin(attribute, self.df.columns).all(), 'Valid properties are: ' + np.array2string(self.df.columns) self.df.loc['values', attribute] = value return self.df
[docs] def default_options(self): """Set default options. Returns: bool: True """ import theano self.df.loc['values', 'device'] = theano.config.device if self.df.loc['values', 'device'] == 'cpu': self.df.loc['values', 'dtype'] = 'float64' else: self.df.loc['values', 'dtype'] = 'float32' self.df.loc['values', 'theano_optimizer'] = 'fast_compile' return True
[docs]@_setdoc_pro([Grid.__doc__, Structure.__doc__]) class KrigingParameters(object): """ Class that stores and computes the default values for the kriging parameters used during the interpolation. The default values will be computed from the :class:`Grid` and :class:`Structure` linked objects Attributes: grid (:class:`Grid`): [s0] structure (:class:`Structure`): [s1] df (:class:`pn.DataFrame`): df containing the kriging parameters. Args: grid (:class:`Grid`): [s0] structure (:class:`Structure`): [s1] """
[docs] def __init__(self, grid: Grid, structure: Structure): self.structure = structure self.grid = grid df_ = pn.DataFrame(np.array([np.nan, np.nan, 3]).reshape(1, -1), index=['values'], columns=['range', '$C_o$', 'drift equations', ]) self.df = df_.astype({'drift equations': object, 'range': object, '$C_o$': object}) self.set_default_range() self.set_default_c_o() self.set_u_grade()
def __repr__(self): return self.df.T.to_string() def _repr_html_(self): return self.df.T.to_html()
[docs] def modify_kriging_parameters(self, attribute: str, value, **kwargs): """Method to modify a given field Args: attribute (str): Name of the field to modify value: new value of the field. It will have to exist in the category in order for pandas to modify it. kwargs: * u_grade_sep (str): If drift equations values are `str`, symbol that separates the values. Returns: :class:`pandas.DataFrame`: df where options data is stored """ u_grade_sep = kwargs.get('u_grade_sep', ',') assert np.isin(attribute, self.df.columns).all(), 'Valid properties are: ' + np.array2string(self.df.columns) if attribute == 'drift equations': value = np.asarray(value) print(value) if type(value) is str: value = np.fromstring(value[1:-1], sep=u_grade_sep, dtype=int) try: assert value.shape[0] is self.structure.df.loc['values', 'len series surface_points'].shape[0] print(value, attribute)['values', attribute] = value print(self.df) except AssertionError: print('u_grade length must be the same as the number of series') else: self.df = self.df.astype({'drift equations': object, 'range': object, '$C_o$': object})['values', attribute] = value
[docs] def str2int_u_grade(self, **kwargs): """ Convert u_grade to ints Args: **kwargs: * u_grade_sep (str): If drift equations values are `str`, symbol that separates the values. Returns: """ u_grade_sep = kwargs.get('u_grade_sep', ',') value = self.df.loc['values', 'drift equations'] if type(value) is str: value = np.fromstring(value[1:-1], sep=u_grade_sep, dtype=int) try: assert value.shape[0] is self.structure.df.loc['values', 'len series surface_points'].shape[0]['values', 'drift equations'] = value except AssertionError: print('u_grade length must be the same as the number of series') return self.df
[docs] def set_default_range(self, extent=None): """ Set default kriging_data range Args: extent (Optional[float, np.array]): extent used to compute the default range--i.e. largest diagonal. If None extent of the linked :class:`Grid` will be used. Returns: """ if extent is None: extent = self.grid.regular_grid.extent if np.sum(extent) == 0 and self.grid.values.shape[0] > 1: extent = np.concatenate((np.min(self.grid.values, axis=0), np.max(self.grid.values, axis=0)))[[0, 3, 1, 4, 2, 5]] try: range_var = np.sqrt( (extent[0] - extent[1]) ** 2 + (extent[2] - extent[3]) ** 2 + (extent[4] - extent[5]) ** 2) except TypeError: warnings.warn('The extent passed or if None the extent of the grid object has some ' 'type of problem', TypeError) range_var = np.array(np.nan) self.df['range'] = np.atleast_1d(range_var) return range_var
[docs] def set_default_c_o(self, range_var=None): """ Set default covariance at 0. Args: range_var (Optional[float, np.array]): range used to compute the default c_0--i.e. largest diagonal. If None the already computed range will be used. Returns: """ if range_var is None: range_var = self.df.loc['values', 'range'] if type(range_var) is list: range_var = np.atleast_1d(range_var)['values', '$C_o$'] = range_var ** 2 / 14 / 3 return self.df['$C_o$']
[docs] def set_u_grade(self, u_grade: list = None): """ Set default universal grade. Transform polynomial grades to number of equations Args: u_grade (list): Returns: """ # ========================= # Choosing Universal drifts # ========================= if u_grade is None: len_series_i = self.structure.df.loc['values', 'len series surface_points'] u_grade = np.ones_like(len_series_i) # u_grade[(len_series_i > 1)] = 1 else: u_grade = np.array(u_grade) # Transforming grade to number of equations n_universal_eq = np.zeros_like(u_grade) n_universal_eq[u_grade == 0] = 0 n_universal_eq[u_grade == 1] = 3 n_universal_eq[u_grade == 2] = 9['values', 'drift equations'] = n_universal_eq return self.df['drift equations']
[docs]class AdditionalData(object): """ Container class that encapsulate :class:`Structure`, :class:`KrigingParameters`, :class:`Options` and rescaling parameters Args: surface_points (:class:`SurfacePoints`): [s0] orientations (:class:`Orientations`): [s1] grid (:class:`Grid`): [s2] faults (:class:`Faults`): [s4] surfaces (:class:`Surfaces`): [s3] rescaling (:class:`RescaledData`): [s5] Attributes: structure_data (:class:`Structure`): [s6] options (:class:`Options`): [s8] kriging_data (:class:`Structure`): [s7] rescaling_data (:class:`RescaledData`): """
[docs] def __init__(self, surface_points, orientations, grid: Grid, faults, surfaces: Surfaces, rescaling): self.structure_data = Structure(surface_points, orientations, surfaces, faults) self.options = Options() self.kriging_data = KrigingParameters(grid, self.structure_data) self.rescaling_data = rescaling
def __repr__(self): concat_ = self.get_additional_data() return concat_.to_string() def _repr_html_(self): concat_ = self.get_additional_data() return concat_.to_html()
[docs] def get_additional_data(self): """ Concatenate all linked data frames and transpose them for a nice visualization. Returns: pn.DataFrame: concatenated and transposed dataframe """ concat_ = pn.concat([self.structure_data.df, self.options.df, self.kriging_data.df, self.rescaling_data.df], axis=1, keys=['Structure', 'Options', 'Kriging', 'Rescaling']) return concat_.T
[docs] def update_default_kriging(self): """ Update default kriging values. """ self.kriging_data.set_default_range() self.kriging_data.set_default_c_o() self.kriging_data.set_u_grade()
[docs] def update_structure(self): """ Update fields dependent on input data sucha as structure and universal kriging grade """ self.structure_data.update_structure_from_input() if len(self.kriging_data.df.loc['values', 'drift equations']) < \ self.structure_data.df.loc['values', 'number series']: self.kriging_data.set_u_grade()