from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from tqdm import tqdm
from quick_pp import logger
from quick_pp.config import Config
from quick_pp.porosity import density_porosity, neu_den_xplot_poro_pt
[docs]
class MultiMineral:
"""Optimization-based multi-mineral model for lithology estimation with fluid volumes."""
def __init__(
self,
minerals: Optional[List[str]] = None,
fluid_properties: Optional[Dict] = None,
porosity_method: str = "density",
porosity_endpoints: Optional[Dict] = None,
):
"""Initialize the MultiMineral optimizer.
Args:
minerals (list, optional): A list of minerals for the optimization.
Defaults to ['QUARTZ', 'CALCITE', 'DOLOMITE', 'SHALE'].
fluid_properties (dict, optional): Properties for oil, gas, and water. Defaults to standard values.
porosity_method (str, optional): The method for initial porosity estimation
('neutron_density', 'density', 'sonic'). Defaults to 'density'.
porosity_endpoints (dict, optional): Endpoints for the 'neutron_density' porosity method.
"""
self.responses = Config.MINERALS_LOG_VALUE
available_minerals = list(self.responses.keys())
# Default minerals if none specified
if minerals is None:
minerals = ["QUARTZ", "CALCITE", "DOLOMITE", "SHALE"]
for mineral in minerals:
if mineral not in available_minerals:
raise ValueError(
f"Mineral '{mineral}' is not defined in Config.MINERALS_LOG_VALUE."
)
self.minerals = minerals
self.porosity_method = porosity_method
self.porosity_endpoints = porosity_endpoints or {}
# Default fluid properties if none specified
if fluid_properties is None:
fluid_properties = {
"OIL": {"density": 0.8, "nphi": 0.0, "pef": 0.0, "dtc": 200.0},
"GAS": {"density": 0.2, "nphi": 0.0, "pef": 0.0, "dtc": 400.0},
"WATER": {"density": 1.0, "nphi": 1.0, "pef": 0.0, "dtc": 190.0},
}
self.fluid_properties = fluid_properties
self.fluids = ["OIL", "GAS", "WATER"]
# Define bounds for each mineral type
mineral_bounds = dict.fromkeys(available_minerals, (0, 1))
# Define bounds for fluid volumes (0 to 1 for each fluid)
fluid_bounds = {"OIL": (0, 1), "GAS": (0, 1), "WATER": (0, 1)}
# Set bounds based on selected minerals and fluids
self.mineral_bounds = tuple(
mineral_bounds[mineral] for mineral in self.minerals
)
self.fluid_bounds = tuple(fluid_bounds[fluid] for fluid in self.fluids)
# Combined bounds: minerals first, then fluids
self.bounds = self.mineral_bounds + self.fluid_bounds
# Constraint: sum of mineral volumes + fluid volumes must equal 1
self.constraints = [{"type": "eq", "fun": self._volume_constraint}]
self.scaling_factors = {}
[docs]
def _volume_constraint(self, volumes: np.ndarray) -> float:
"""Ensure that the sum of all mineral and fluid volumes equals 1."""
return np.sum(volumes) - 1
[docs]
def _calculate_porosity(
self, nphi: float, rhob: float, dtc: Optional[float] = None
) -> float:
"""Calculate an initial porosity estimate based on the selected method.
Args:
nphi (float): Neutron porosity value.
rhob (float): Bulk density value.
dtc (float, optional): Sonic transit time value. Defaults to None.
Returns:
float: The initial porosity estimate.
"""
if self.porosity_method == "density":
# Calculate an initial rho_ma based on the selected minerals
initial_rho_ma = sum(
self.responses[mineral].get("RHOB", 0)
for mineral in self.minerals
if mineral in self.responses
)
count_minerals_with_density = sum(
1
for mineral in self.minerals
if mineral in self.responses and "RHOB" in self.responses[mineral]
)
if count_minerals_with_density > 0:
rho_ma = initial_rho_ma / count_minerals_with_density
else:
rho_ma = 2.65 # Default matrix density (Quartz)
# Calculate average fluid density for porosity calculation
rho_fluid_avg = np.mean(
[prop["density"] for prop in self.fluid_properties.values()]
)
return density_porosity(rhob, rho_ma, rho_fluid_avg)
elif self.porosity_method == "neutron_density":
# Use neutron-density crossplot porosity
dry_min1_point = self.porosity_endpoints.get(
"dry_min1_point", (-0.02, 2.65)
)
dry_clay_point = self.porosity_endpoints.get("dry_clay_point", (0.37, 2.41))
fluid_point = self.porosity_endpoints.get("fluid_point", (1.0, 1.0))
return neu_den_xplot_poro_pt(
nphi,
rhob,
model="ss",
dry_min1_point=dry_min1_point,
dry_clay_point=dry_clay_point,
fluid_point=fluid_point,
)
elif self.porosity_method == "sonic":
# Sonic porosity calculation (simplified)
if dtc is None:
return 0.0
dt_ma = 55.0 # Default matrix slowness
dt_fluid = 190.0 # Default fluid slowness
return (dtc - dt_ma) / (dt_fluid - dt_ma)
else:
raise ValueError(f"Unknown porosity method: {self.porosity_method}")
[docs]
def _calculate_matrix_density(self, mineral_volumes: np.ndarray) -> float:
"""Calculate the average matrix density from mineral volumes.
Args:
mineral_volumes (np.ndarray): An array of mineral volumes.
Returns:
float: The calculated matrix density.
"""
rho_ma = 0.0
total_mineral_vol = np.sum(mineral_volumes)
if total_mineral_vol < 1e-6:
return 2.65 # Return default if no minerals
for i, mineral in enumerate(self.minerals):
if mineral in self.responses and "RHOB" in self.responses[mineral]:
rho_ma += mineral_volumes[i] * self.responses[mineral]["RHOB"]
return rho_ma / total_mineral_vol
[docs]
def _calculate_fluid_density(self, fluid_volumes: np.ndarray) -> float:
"""Calculate the average fluid density from fluid volumes.
Args:
fluid_volumes (np.ndarray): An array of fluid volumes.
Returns:
float: The calculated average fluid density.
"""
rho_fluid = 0.0
for i, fluid in enumerate(self.fluids):
rho_fluid += fluid_volumes[i] * self.fluid_properties[fluid]["density"]
return rho_fluid
[docs]
def _calculate_log_response(self, volumes: np.ndarray, log_type: str) -> float:
"""Calculate the reconstructed log response for a given log type.
Args:
volumes (np.ndarray): An array of combined mineral and fluid volumes.
log_type (str): The type of log to reconstruct (e.g., 'NPHI', 'RHOB').
Returns:
float: The reconstructed log value.
"""
response = 0.0
# Add mineral contributions
n_minerals = len(self.minerals)
for i, mineral in enumerate(self.minerals):
if mineral in self.responses and log_type in self.responses[mineral]:
response += volumes[i] * self.responses[mineral][log_type]
# Add fluid contributions
for i, fluid in enumerate(self.fluids):
fluid_idx = n_minerals + i
fluid_vol = volumes[fluid_idx]
if log_type == "NPHI":
response += fluid_vol * self.fluid_properties[fluid]["nphi"]
elif log_type == "RHOB":
response += fluid_vol * self.fluid_properties[fluid]["density"]
elif log_type == "DTC":
response += fluid_vol * self.fluid_properties[fluid]["dtc"]
elif log_type == "PEF":
response += fluid_vol * self.fluid_properties[fluid]["pef"]
# GR doesn't change with fluid type (assuming fresh water)
return response
[docs]
def _calculate_average_error(
self, volumes: np.ndarray, log_values: Dict[str, float]
) -> float:
"""Calculate the average absolute error between measured and reconstructed logs.
Args:
volumes (np.ndarray): An array of combined mineral and fluid volumes.
log_values (dict): A dictionary of measured log values.
Returns:
float: The average absolute error.
"""
total_error = 0.0
valid_count = 0
for log_type, measured_value in log_values.items():
if measured_value is not None and not np.isnan(measured_value):
reconstructed_value = self._calculate_log_response(volumes, log_type)
error = abs(measured_value - reconstructed_value)
total_error += error
valid_count += 1
return total_error / valid_count if valid_count > 0 else np.nan
[docs]
def _error_function(
self, volumes: np.ndarray, log_values: Dict[str, float]
) -> float:
"""Calculate the sum of squared errors for the optimization.
Args:
volumes (np.ndarray): An array of combined mineral and fluid volumes.
log_values (dict): A dictionary of measured log values.
Returns:
float: The total squared error.
"""
total_error = 0.0
for log_type, measured_value in log_values.items():
if measured_value is not None and not np.isnan(measured_value):
# Special handling for RHOB when using density porosity method
if self.porosity_method == "density" and log_type == "RHOB":
mineral_vols = volumes[: len(self.minerals)]
fluid_vols = volumes[len(self.minerals) :]
porosity = np.sum(fluid_vols)
# For density porosity, RHOB is reconstructed from porosity and matrix density
rho_ma = self._calculate_matrix_density(mineral_vols)
rho_fluid = self._calculate_fluid_density(fluid_vols)
reconstructed_value = rho_ma * (1 - porosity) + rho_fluid * porosity
else:
reconstructed_value = self._calculate_log_response(
volumes, log_type
)
scaling = self.scaling_factors.get(log_type, 1.0)
error = (scaling * (measured_value - reconstructed_value)) ** 2
total_error += error
return total_error
[docs]
def _optimize_volumes(
self, log_values: Dict[str, float]
) -> Tuple[np.ndarray, np.ndarray, bool]:
"""Perform the optimization to find the best-fit mineral and fluid volumes.
Args:
log_values (dict): A dictionary of measured log values for a single depth point.
Returns:
tuple: A tuple containing the optimized mineral volumes, fluid volumes, and a success flag.
"""
# Filter out None and NaN values
valid_logs = {
k: v for k, v in log_values.items() if v is not None and not np.isnan(v)
}
if not valid_logs:
raise ValueError("No valid log measurements provided")
# Calculate initial porosity estimate for fluid volume initialization
nphi = log_values.get("NPHI", 0.0)
rhob = log_values.get("RHOB", 2.65)
dtc = log_values.get("DTC", None)
if nphi is not None and rhob is not None:
initial_porosity = self._calculate_porosity(nphi, rhob, dtc)
initial_porosity = max(
0.0, min(0.4, initial_porosity)
) # Constrain to reasonable range
else:
initial_porosity = 0.1 # Default porosity
n_minerals = len(self.minerals)
n_fluids = len(self.fluids)
# Distribute volume between minerals and fluids
mineral_volume = (1.0 - initial_porosity) / n_minerals
fluid_volume = initial_porosity / n_fluids
initial_guess = np.concatenate(
[np.full(n_minerals, mineral_volume), np.full(n_fluids, fluid_volume)]
)
# Run optimization
result = minimize(
self._error_function,
initial_guess,
args=(valid_logs,),
bounds=self.bounds,
constraints=self.constraints,
method="SLSQP",
)
# Split results into mineral and fluid volumes
mineral_volumes = result.x[:n_minerals]
fluid_volumes = result.x[n_minerals:]
return mineral_volumes, fluid_volumes, result.success
[docs]
def estimate_lithology(
self,
gr: np.ndarray,
nphi: np.ndarray,
rhob: np.ndarray,
pef: Optional[np.ndarray] = None,
dtc: Optional[np.ndarray] = None,
auto_scale: bool = True,
) -> pd.DataFrame:
"""Estimate mineral and fluid volumes for a set of log data points.
This function iterates through each depth point, performing an optimization
to determine the volumetric fractions of minerals and fluids that best
match the measured log responses.
Args:
gr (np.ndarray): Gamma Ray log [GAPI].
nphi (np.ndarray): Neutron Porosity log [v/v].
rhob (np.ndarray): Bulk Density log [g/cc].
pef (np.ndarray, optional): Photoelectric Factor log [barns/electron]. Defaults to None.
dtc (np.ndarray, optional): Compressional slowness log [us/ft]. Defaults to None.
auto_scale (bool, optional): If True, automatically calculates scaling factors. Defaults to True.
Returns:
pd.DataFrame: A DataFrame containing the estimated mineral and fluid volumes,
constructed porosity, error metrics, and reconstructed logs for each depth point.
"""
# Initialize output arrays
n_samples = len(gr)
# Initialize mineral volume arrays
mineral_volumes = {}
for mineral in self.minerals:
mineral_volumes[mineral] = np.full(n_samples, np.nan)
# Initialize fluid volume arrays
fluid_volumes = {}
for fluid in self.fluids:
fluid_volumes[fluid] = np.full(n_samples, np.nan)
# Initialize porosity array (sum of fluid volumes)
porosity_constructed = np.full(n_samples, np.nan)
# Initialize the reconstructed logs
gr_reconstructed = np.full(n_samples, np.nan)
nphi_reconstructed = np.full(n_samples, np.nan)
rhob_reconstructed = np.full(n_samples, np.nan)
pef_reconstructed = np.full(n_samples, np.nan)
dtc_reconstructed = np.full(n_samples, np.nan)
# Initialize error array
average_errors = np.full(n_samples, np.nan)
# Initialize QC flags
success_flags = np.full(n_samples, False, dtype=bool)
# Dynamic scaling based on log ranges for robustness
if auto_scale:
all_logs = {"GR": gr, "NPHI": nphi, "RHOB": rhob, "PEF": pef, "DTC": dtc}
for log_type, log_data in all_logs.items():
if log_data is not None:
# Convert to float array and handle None/NaN values
try:
# Convert to float, replacing None with NaN
log_array = np.array(
[np.nan if v is None else float(v) for v in log_data],
dtype=float,
)
# Use 5th and 95th percentiles to calculate a robust range, ignoring NaNs
valid_data = log_array[~np.isnan(log_array)]
if len(valid_data) > 1:
p05 = np.percentile(valid_data, 5)
p95 = np.percentile(valid_data, 95)
log_range = p95 - p05
if log_range > 1e-6: # Avoid division by zero for flat logs
self.scaling_factors[log_type] = round(
1.0 / log_range, 4
)
else:
self.scaling_factors[log_type] = (
1.0 # Default if range is zero
)
except (ValueError, TypeError):
# Skip if conversion fails
self.scaling_factors[log_type] = 1.0
logger.info(f"Scaling factors: {self.scaling_factors}")
# Process each depth point
for i in tqdm(range(n_samples), desc="Processing depth points", unit="points"):
# Prepare log values for current depth
log_values = {
"GR": gr[i],
"NPHI": nphi[i],
"RHOB": rhob[i],
"PEF": pef[i] if pef is not None and not np.isnan(pef[i]) else None,
"DTC": dtc[i] if dtc is not None and not np.isnan(dtc[i]) else None,
}
# Check if we have valid measurements
valid_measurements = [
v for v in log_values.values() if v is not None and not np.isnan(v)
]
if len(valid_measurements) >= 3: # Need at least 3 measurements
try:
# Optimize mineral and fluid volumes
mineral_vols, fluid_vols, success = self._optimize_volumes(
log_values
)
# Store results for each mineral
for j, mineral in enumerate(self.minerals):
mineral_volumes[mineral][i] = mineral_vols[j]
# Store results for each fluid
for j, fluid in enumerate(self.fluids):
fluid_volumes[fluid][i] = fluid_vols[j]
# Store total porosity (sum of fluid volumes)
porosity_constructed[i] = np.sum(fluid_vols)
success_flags[i] = success
# Combine volumes for error calculation and log reconstruction
combined_volumes = np.concatenate([mineral_vols, fluid_vols])
# Calculate and store average error
average_errors[i] = self._calculate_average_error(
combined_volumes, log_values
)
# Store the reconstructed logs
gr_reconstructed[i] = (
self._calculate_log_response(combined_volumes, "GR")
if log_values["GR"] is not None
else np.nan
)
nphi_reconstructed[i] = (
self._calculate_log_response(combined_volumes, "NPHI")
if log_values["NPHI"] is not None
else np.nan
)
rhob_reconstructed[i] = (
self._calculate_log_response(combined_volumes, "RHOB")
if log_values["RHOB"] is not None
else np.nan
)
pef_reconstructed[i] = (
self._calculate_log_response(combined_volumes, "PEF")
if log_values["PEF"] is not None
else np.nan
)
dtc_reconstructed[i] = (
self._calculate_log_response(combined_volumes, "DTC")
if log_values["DTC"] is not None
else np.nan
)
except Exception as e:
tqdm.write(f"\rError at depth {i}: {e}", end="\r")
continue
# Create output DataFrame
output_data = {}
# Add mineral volumes with appropriate column names
mineral_column_mapping = {
mineral: Config.MINERALS_NAME_MAPPING.get(mineral, f"V{mineral}")
for mineral in self.minerals
}
for mineral in self.minerals:
column_name = mineral_column_mapping.get(mineral, f"V{mineral}")
output_data[column_name] = mineral_volumes[mineral]
# Add fluid volumes with appropriate column names
fluid_column_mapping = {"OIL": "VOIL", "GAS": "VGAS", "WATER": "VWATER"}
for fluid in self.fluids:
column_name = fluid_column_mapping.get(fluid, f"V{fluid}")
output_data[column_name] = fluid_volumes[fluid]
# Add total porosity (sum of fluid volumes)
output_data["PHIT_CONSTRUCTED"] = porosity_constructed
# Add error and reconstructed logs
output_data.update(
{
"AVG_ERROR": average_errors,
"OPTIMIZER_SUCCESS": success_flags,
"GR_RECONSTRUCTED": gr_reconstructed,
"NPHI_RECONSTRUCTED": nphi_reconstructed,
"RHOB_RECONSTRUCTED": rhob_reconstructed,
"PEF_RECONSTRUCTED": pef_reconstructed,
"DTC_RECONSTRUCTED": dtc_reconstructed,
}
)
return pd.DataFrame(output_data)