import numpy as np
from quick_pp import logger
from quick_pp.config import Config
from quick_pp.utils import length_a_b, line_intersection
[docs]
def normalize_volumetric(phit, **volumetrics):
"""Normalize lithology given total porosity.
Args:
phit (np.ndarray or float): Total porosity in fraction (v/v).
**volumetrics: Keyword arguments representing volumetric fractions (v/v) relative to matrix.
Returns:
dict: Normalized volumetric fractions relative to bulk volume.
"""
logger.debug("Normalizing volumetrics with total porosity")
# Normalize the volumetrics
vmatrix = 1 - phit
normalized_volumetrics = {
key: value * vmatrix for key, value in volumetrics.items()
}
logger.debug(f"Normalized volumetrics: {list(normalized_volumetrics.keys())}")
return normalized_volumetrics
[docs]
def unnormalize_volumetric(phit, **normalized_volumetrics):
"""Unnormalize lithology given total porosity.
This is the reverse of normalize_volumetric. It calculates the matrix-relative
volumetrics from bulk-relative (normalized) volumetrics.
Args:
phit (np.ndarray or float): Total porosity in fraction (v/v).
**normalized_volumetrics: Keyword arguments representing bulk-relative (normalized)
volumetric fractions of the bulk volume (v/v).
Returns:
dict: Unnormalized volumetric fractions relative to the matrix volume.
"""
logger.debug("Unnormalizing volumetrics with total porosity")
# Unnormalize the volumetrics
vmatrix = 1 - phit
# Use np.divide for safe division by zero, returning np.nan
unnormalized_volumetrics = {
key: np.divide(
value, vmatrix, out=np.full_like(value, np.nan), where=vmatrix != 0
)
for key, value in normalized_volumetrics.items()
}
logger.debug(f"Unnormalized volumetrics: {list(unnormalized_volumetrics.keys())}")
return unnormalized_volumetrics
[docs]
def get_volumetric_dict(df):
"""Given dataframe, return a dictionary of the key and values of lithology metrics in the data.
Args:
df (pd.DataFrame): Dataframe with well log data.
Returns:
dict: Dictionary of the key and values of lithology metrics in the data.
"""
volumetric_dict = {}
for vol_log in Config.MINERALS_LOG_VALUE.keys():
vol_log = Config.MINERALS_NAME_MAPPING.get(vol_log, vol_log)
if vol_log in df.columns:
volumetric_dict[vol_log.lower()] = df[vol_log].values
return volumetric_dict
[docs]
def effective_porosity(phit, phi_shale, vshale):
"""
Computes effective porosity from total porosity, total porosity of shale and shale volume.
Parameters
----------
phit : np.ndarray or float
Total porosity [fraction].
phi_shale : np.ndarray or float
Total porosity of shale [fraction].
vshale : np.ndarray or float
Shale volume [fraction].
Returns
-------
porosity : float
Effective porosity [fraction].
"""
logger.debug(f"Calculating effective porosity with shale porosity: {phi_shale:.3f}")
phie = phit - (vshale * phi_shale)
logger.debug(f"Effective porosity range: {phie.min():.3f} - {phie.max():.3f}")
return phie
[docs]
def estimate_shale_porosity_trend(
rho_clw: np.ndarray, rho_dry_clay: float = 2.72, rho_fluid: float = 1.0
):
"""Calculate clay porosity given bulk density of wet clay line.
Args:
rho_clw (np.ndarray or float): Bulk density of wet clay line.
rho_dry_clay (float, optional): Bulk density of dry clay. Defaults to 2.72.
rho_fluid (float, optional): Bulk density of fluid. Defaults to 1.0.
Returns:
np.ndarray or float: Clay porosity.
"""
logger.debug(
f"Calculating clay porosity with dry clay density: {rho_dry_clay} g/cm³"
)
phi_clay = (rho_dry_clay - rho_clw) / (rho_dry_clay - rho_fluid)
logger.debug(f"Clay porosity range: {phi_clay.min():.3f} - {phi_clay.max():.3f}")
return phi_clay
[docs]
def estimate_shale_porosity(nphi, phit):
"""
Computes shale porosity from neutron porosity and total porosity.
Args:
nphi (np.ndarray or float): Neutron porosity (hydrocarbon corrected) [fraction].
phit (np.ndarray or float): Total porosity [fraction].
Returns:
np.ndarray or float: Shale porosity [fraction].
"""
phit_sh = nphi - phit
return np.where(phit_sh > 0, phit_sh, phit).clip(1e-2, 1.0)
[docs]
def rho_matrix(**volumetrics):
"""Estimate average matrix density based on mineral volumes and their densities from Config.
Args:
**volumetrics: Keyword arguments where keys are mineral volume names
(e.g., vsand, vclay) and values are their volume fractions.
Returns:
np.ndarray or float: Matrix density in g/cc.
"""
logger.debug("Calculating matrix density from mineral volumes")
rho_ma = 0.0
mineral_properties = Config.MINERALS_LOG_VALUE
# Create a reverse mapping from volume name (e.g., 'VSAND') to mineral name (e.g., 'QUARTZ')
name_to_mineral_map = {v: k for k, v in Config.MINERALS_NAME_MAPPING.items()}
for vol_name, vol_value in volumetrics.items():
# Find the mineral name (e.g., 'QUARTZ') from the volume name (e.g., 'VSAND')
mineral_name = name_to_mineral_map.get(vol_name.upper())
if mineral_name and mineral_name in mineral_properties:
rho_mineral = mineral_properties[mineral_name].get("RHOB")
if rho_mineral is not None:
rho_ma += vol_value * rho_mineral
elif (
vol_name.upper() == "VSILT"
): # Handle silt separately if not in main config
rho_ma += vol_value * 2.68
logger.debug(
f"Matrix density range: {np.nanmin(rho_ma):.3f} - {np.nanmax(rho_ma):.3f} g/cm³"
)
return rho_ma
[docs]
def density_porosity(rhob, rho_matrix, rho_fluid: float = 1.0):
"""Computes density porosity from bulk, matrix and fluid densities
Args:
rhob (np.ndarray or float): Bulk density log in g/cc.
rho_matrix (np.ndarray or float): Matrix density in g/cc.
rho_fluid (float, optional): Density of fluid in g/cc. Defaults to 1.0 g/cc.
Returns:
np.ndarray or float: Density porosity [fraction]
"""
logger.debug("Calculating density porosity with fluid density")
phi_d = (rho_matrix - rhob) / (rho_matrix - rho_fluid)
logger.debug(f"Density porosity range: {phi_d.min():.3f} - {phi_d.max():.3f}")
return phi_d
[docs]
def dt_matrix(
vsand=0,
vclay=0,
vcalc=0,
vdolo=0,
vheavy=0,
dt_sand: float = 0,
dt_silt: float = 0,
dt_clay: float = 0,
dt_calc: float = 0,
dt_dolo: float = 0,
dt_heavy: float = 0,
):
"""Estimate average matrix sonic transit time based on dry sand, dry silt dry calcite and
dry dolomite volume and transit time of each.
Args:
vsand (np.ndarray or float, optional): Volume of sand. Defaults to 0.
vclay (np.ndarray or float, optional): Volume of clay. Defaults to 0.
vcalc (np.ndarray or float, optional): Volume of calcite. Defaults to 0.
vdolo (np.ndarray or float, optional): Volume of dolomite. Defaults to 0.
vheavy (np.ndarray or float, optional): Volume of heavy minerals. Defaults to 0.
dt_sand (float, optional): Sonic transit time of sand in us/ft. Defaults to None.
dt_silt (float, optional): Sonic transit time of silt in us/ft. Defaults to None.
dt_clay (float, optional): Sonic transit time of clay in us/ft. Defaults to None.
dt_calc (float, optional): Sonic transit time of calcite in us/ft. Defaults to None.
dt_dolo (float, optional): Sonic transit time of dolomite in us/ft. Defaults to None.
dt_heavy (float, optional): Sonic transit time of heavy minerals in us/ft. Defaults to 0.0.
Returns:
np.ndarray or float: Matrix sonic transit time in us/ft.
"""
logger.debug("Calculating matrix sonic transit time from mineral volumes")
minerals_log_value = Config.MINERALS_LOG_VALUE
dt_sand = dt_sand or minerals_log_value["QUARTZ"]["DTC"]
dt_clay = dt_clay or minerals_log_value["SHALE"]["DTC"]
dt_calc = dt_calc or minerals_log_value["CALCITE"]["DTC"]
dt_dolo = dt_dolo or minerals_log_value["DOLOMITE"]["DTC"]
dt_matrix = (
vsand * dt_sand
+ vclay * dt_clay
+ vcalc * dt_calc
+ vdolo * dt_dolo
+ vheavy * dt_heavy
)
logger.debug(
f"Matrix sonic transit time range: {dt_matrix.min():.1f} - {dt_matrix.max():.1f} us/ft"
)
return dt_matrix
[docs]
def sonic_porosity_wyllie(dt, dt_matrix, dt_fluid):
"""
Computes sonic porosity based on Wyllie's equation from interval, matrix, and fluid transit time.
Parameters
----------
dt : np.ndarray or float
Interval transit time [us/ft].
dt_matrix : np.ndarray or float
Matrix transit time [us/ft]. Sandstone: 51-55, Limestone: 43-48, Dolomite: 43-39, Shale: 60-170.
dt_fluid : np.ndarray or float
Fluid transit time [us/ft]. Water: 190, Oil: 240, Gas: 630.
Returns
-------
porosity : np.ndarray or float
Sonic porosity [fraction].
"""
logger.debug(
f"Calculating Wyllie sonic porosity with fluid transit time: {dt_fluid} us/ft"
)
phi_s = (dt - dt_matrix) / (dt_fluid - dt_matrix)
logger.debug(f"Wyllie sonic porosity range: {phi_s.min():.3f} - {phi_s.max():.3f}")
return phi_s
[docs]
def sonic_porosity_hunt_raymer(dt, dt_matrix, dt_fluid):
"""
Computes sonic porosity based on Hunt-Raymer's equation from interval, matrix and transit time.
Parameters
----------
dt : np.ndarray or float
Interval transit time [us/ft].
dt_matrix : np.ndarray or float
Matrix transit time [us/ft]. Sandstone: 51-55, Limestone: 43-48, Dolomite: 43-39, Shale: 60-170.
dt_fluid : np.ndarray or float
Fluid transit time [us/ft]. Water: 190, Oil: 240, Gas: 630.
Returns
-------
porosity : np.ndarray or float
Sonic porosity [fraction].
"""
logger.debug(
f"Calculating Hunt-Raymer sonic porosity with fluid transit time: {dt_fluid} us/ft"
)
c = (dt_matrix / (2 * dt_fluid)) - 1
phi_s = -c - (c**2 + (dt_matrix / dt) - 1) ** 0.5
logger.debug(
f"Hunt-Raymer sonic porosity range: {phi_s.min():.3f} - {phi_s.max():.3f}"
)
return phi_s
[docs]
def neu_den_xplot_poro_pt(
nphi: float,
rhob: float,
model: str = "ssc",
dry_min1_point: tuple = (),
dry_silt_point: tuple = (),
dry_clay_point: tuple = (),
fluid_point: tuple = (1.0, 1.0),
):
"""Calculate porosity given a pair of neutron porosity and bulk density data point.
This function is designed to process a single data point.
Args:
nphi (float): Neutron porosity log value.
rhob (float): Bulk density log value.
model (str, optional): Lithology model, either 'ssc' (Sand Silt Clay) or 'ss' (Sand Shale). Defaults to 'ssc'.
dry_min1_point (tuple, optional): Neutron porosity and bulk density of mineral 1 point. Defaults to ().
dry_silt_point (tuple, optional): Neutron porosity and bulk density of dry silt point. Defaults to ().
dry_clay_point (tuple, optional): Neutron porosity and bulk density of dry clay point. Defaults to ().
fluid_point (tuple, optional): Neutron porosity and bulk density of fluid point. Defaults to (1.0, 1.0).
Returns:
float: Total porosity for the given data point.
"""
logger.debug(f"Calculating neutron-density crossplot porosity with model: {model}")
assert model in ["ssc", "ss", "carb"], (
"Please specify either 'ssc', 'ss' or 'carb' model."
)
A = dry_min1_point
B = dry_silt_point
C = dry_clay_point
D = fluid_point
phit = []
if model == "ssc":
# Check if the point is in the reservoir or non-reservoir section
thold_pt = line_intersection((A, C), (D, B))
thold_line = length_a_b(thold_pt, A)
proj_pt = line_intersection((A, C), (D, (nphi, rhob)))
proj_line = length_a_b(proj_pt, A)
if proj_line < thold_line:
m = (A[1] - B[1]) / (A[0] - B[0])
else:
m = (C[1] - B[1]) / (C[0] - B[0])
c = rhob - m * nphi
iso_poro_pt = line_intersection(((0, c), (nphi, rhob)), (D, B))
iso_poro_line = length_a_b(iso_poro_pt, B)
poro_line = length_a_b(D, B)
phit = iso_poro_line / poro_line
else:
m = (A[1] - C[1]) / (A[0] - C[0])
c = rhob - m * nphi
iso_poro_pt = line_intersection(((0, c), (nphi, rhob)), (D, A))
iso_poro_line = length_a_b(iso_poro_pt, A)
poro_line = length_a_b(D, A)
phit = iso_poro_line / poro_line
logger.debug(f"Crossplot porosity: {phit:.3f}")
return phit
[docs]
def neu_den_xplot_poro(
nphi,
rhob,
model: str = "ssc",
dry_min1_point: tuple = (),
dry_silt_point: tuple = (),
dry_clay_point: tuple = (),
fluid_point: tuple = (1.0, 1.0),
):
"""Calculate porosity given neutron porosity and bulk density logs.
This function processes arrays of log data.
Args:
nphi (np.ndarray or float): Neutron porosity log.
rhob (np.ndarray or float): Bulk density log.
model (str, optional): Lithology model, either 'ssc' (Sand Silt Clay), 'ss' (Sand Shale) or 'carb' (Carbonate).
Defaults to 'ssc'.
dry_min1_point (tuple, optional): Neutron porosity and bulk density of dry min1 point. Defaults to ().
dry_silt_point (tuple, optional): Neutron porosity and bulk density of dry silt point. Defaults to ().
dry_clay_point (tuple, optional): Neutron porosity and bulk density of dry clay point. Defaults to ().
fluid_point (tuple, optional): Neutron porosity and bulk density of fluid point. Defaults to (1.0, 1.0).
Returns:
np.ndarray or float: Total porosity log.
"""
logger.debug(
f"Calculating neutron-density crossplot porosity for {len(nphi)} points with model: {model}"
)
assert model in ["ssc", "ss", "carb"], (
"Please specify either 'ssc', 'ss' or 'carb' model."
)
A = dry_min1_point
B = dry_silt_point
C = dry_clay_point
D = fluid_point
E = list(zip(nphi, rhob, strict=True))
phit = np.empty(0)
for _, point in enumerate(E):
if model == "ssc":
phit = np.append(
phit, neu_den_xplot_poro_pt(point[0], point[1], "ssc", A, B, C, D)
)
else:
phit = np.append(
phit, neu_den_xplot_poro_pt(point[0], point[1], "ss", A, (0, 0), C, D)
)
logger.debug(f"Crossplot porosity range: {phit.min():.3f} - {phit.max():.3f}")
return phit
[docs]
def porosity_correction_averaging(nphi, rhob, rho_ma=2.65, method="weighted"):
"""Correct porosity using averaging method.
Weighted average: (2 * dphi + nphi) / 3
Arithmetic average: (dphi + nphi) / 2
Gaymard average: sqrt((dphi**2 + nphi**2) / 2)
Gas average: sqrt((dphi**2 + nphi**2) / 2)
Args:
nphi (np.ndarray or float): Neutron porosity.
rhob (np.ndarray or float): Bulk density log.
rho_ma (float, optional): Matrix density. Defaults to 2.65.
method (str, optional): Averaging method selection from 'weighted', 'arithmetic' or 'gaymard'.
Defaults to 'weighted'.
Returns:
np.ndarray or float: Corrected porosity.
"""
logger.debug(f"Correcting porosity using {method} averaging method")
assert method in ["weighted", "arithmetic", "gaymard", "gas"], (
"method must be either \
'weighted', 'arithmetic', 'gaymard' or 'gas"
)
dphi = density_porosity(rhob, rho_ma, 1.0)
if method == "weighted":
phi_corr = (2 * dphi + nphi) / 3
elif method == "arithmetic":
phi_corr = (dphi + nphi) / 2
elif method == "gaymard":
phi_corr = np.sqrt((dphi**2 + nphi**2) / 2)
elif method == "gas":
phi_corr = ((nphi**2 + dphi**2) / 2) ** 0.5
logger.debug(
f"Corrected porosity range: {phi_corr.min():.3f} - {phi_corr.max():.3f}"
)
return phi_corr
[docs]
def porosity_trend(tvdss, unit="ft"):
"""Calculate porosity trend based on TVDSS (Schmoker, 1982)
Args:
tvdss (np.ndarray or float): True Vertical Depth Subsea.
unit (str, optional): Unit of depth, either 'ft' or 'm'. Defaults to 'ft'.
Returns:
np.ndarray or float: Porosity trend.
"""
logger.debug(f"Calculating porosity trend with unit: {unit}")
assert unit in ["ft", "m"], "Please specify either ft or m as unit."
if unit == "ft":
phi_trend = 41.73 * np.exp(-tvdss / 8197)
else:
phi_trend = 41.73 * np.exp(-tvdss / 2498)
logger.debug(f"Porosity trend range: {phi_trend.min():.3f} - {phi_trend.max():.3f}")
return phi_trend
[docs]
def nmr_porosity(t2_dist, t2_bins, t2_cutoff=33):
"""Calculate NMR porosity from T2 distribution.
Total porosity is the sum of all T2 distribution amplitudes.
Effective porosity is the sum of T2 distribution amplitudes above the T2 cutoff.
Args:
t2_dist (array-like): T2 distribution amplitudes
t2_bins (array-like): T2 time bins corresponding to distribution
t2_cutoff (float, optional): T2 cutoff time in ms. Defaults to 33ms.
Returns:
tuple: Total porosity and effective porosity (phit, phie)
"""
logger.debug(f"Calculating NMR porosity with T2 cutoff: {t2_cutoff}ms")
# Total porosity is sum of all T2 amplitudes
phit = np.sum(t2_dist)
# Effective porosity is sum of amplitudes above T2 cutoff
phie = np.sum(t2_dist[t2_bins >= t2_cutoff])
logger.debug(f"NMR total porosity range: {phit.min():.3f} - {phit.max():.3f}")
logger.debug(f"NMR effective porosity range: {phie.min():.3f} - {phie.max():.3f}")
return phit, phie