import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import gmean, hmean, truncnorm
from tqdm import tqdm
from quick_pp import logger
from quick_pp.utils import minmax_scale
[docs]
def calc_reservoir_summary(depth, vshale, phit, swt, perm, zones, cutoffs=None):
"""Calculate reservoir summary based on cutoffs on vshale, phit, and swt.
This function aggregates petrophysical properties over specified zones and flags
(all, rock, reservoir, pay) to provide a comprehensive reservoir summary.
Args:
depth (np.ndarray or float): Depth, either MD or TVD.
vshale (np.ndarray or float): Volume of shale in fraction.
phit (np.ndarray or float): Total porosity in fraction.
swt (np.ndarray or float): Total water saturation in fraction.
perm (np.ndarray or float): Permeability.
zones (np.ndarray or str): Zone names.
cutoffs (dict, optional): A dictionary with 'VSHALE', 'PHIT', and 'SWT' cutoffs.
Defaults to `dict(VSHALE=0.4, PHIT=0.01, SWT=0.9)`.
Returns:
pd.DataFrame: A DataFrame containing the reservoir summary in tabular format.
"""
logger.debug(
f"Starting reservoir summary calculation with {len(depth)} data points"
)
cutoffs = cutoffs or {"VSHALE": 0.4, "PHIT": 0.01, "SWT": 0.9}
df = pd.DataFrame(
{
"depth": depth,
"vshale": vshale,
"phit": phit,
"swt": swt,
"perm": perm,
"zones": zones,
}
)
step = df["depth"].diff().mean()
df["bvhc"] = df["phit"] * (1 - df["swt"])
df["rock_flag"], df["reservoir_flag"], df["pay_flag"] = flag_interval(
df["vshale"], df["phit"], df["swt"], cutoffs
)
df["all_flag"] = 1
logger.debug(
f"Data flags calculated - Rock: {df['rock_flag'].sum()}, "
f"Reservoir: {df['reservoir_flag'].sum()}, Pay: {df['pay_flag'].sum()}"
)
ressum_df = pd.DataFrame()
for flag in ["all", "rock", "reservoir", "pay"]:
temp_df = pd.DataFrame()
# Calculate net thickness
temp_df[["zones", "net"]] = (
df.groupby(["zones"])[[f"{flag}_flag"]]
.agg(lambda x: np.nansum(x) * step)
.reset_index()
)
# Average the properties and merge
flag_df = df[df[f"{flag}_flag"] == 1].copy()
avg_df = (
flag_df.groupby(["zones"])
.agg(
{
"vshale": lambda x: np.nanmean(x),
"phit": lambda x: np.nanmean(x),
"swt": lambda x: np.nanmean(x),
"bvhc": lambda x: np.nanmean(x),
}
)
.reset_index()
)
avg_df["perm_am"] = (
flag_df.groupby("zones")["perm"].agg("mean").reset_index(drop=True)
)
avg_df["perm_gm"] = (
flag_df.groupby("zones")["perm"]
.agg(gmean, nan_policy="omit")
.reset_index(drop=True)
)
avg_df["perm_hm"] = (
flag_df.groupby("zones")["perm"]
.agg(hmean, nan_policy="omit")
.reset_index(drop=True)
)
avg_df = avg_df.rename(
columns={
"vshale": "av_vshale",
"phit": "av_phit",
"swt": "av_swt",
"bvhc": "av_bvhc",
"perm_gm": "av_perm_gm",
"perm_am": "av_perm_am",
"perm_hm": "av_perm_hm",
}
)
temp_df = temp_df.merge(avg_df, on=["zones"], how="left", validate="1:1")
# Set the maximum depth as bottom depth
bottom = (
df.groupby(["zones"])[["depth"]]
.agg(lambda x: np.nanmax(x))
.reset_index()
.rename(columns={"depth": "bottom"})
)
temp_df = temp_df.merge(
bottom[["zones", "bottom"]], on=["zones"], how="left", validate="1:1"
)
# Set the minimum depth as top depth
top = (
df.groupby(["zones"])[["depth"]]
.agg(lambda x: np.nanmin(x))
.reset_index()
.rename(columns={"depth": "top"})
)
temp_df = temp_df.merge(
top[["zones", "top"]], on=["zones"], how="left", validate="1:1"
)
temp_df["flag"] = flag
# Calculate gross thickness
temp_df["gross"] = temp_df["bottom"] - temp_df["top"]
# Calculate net thickness
temp_df["ntg"] = temp_df["net"] / temp_df["gross"]
# Concat to ressum_df
ressum_df = pd.concat([ressum_df, temp_df], ignore_index=True)
ressum_df = ressum_df.round(3)
ressum_df = ressum_df.sort_values(by=["top"], ignore_index=True)
# Sort the columns
cols = [
"zones",
"flag",
"top",
"bottom",
"gross",
"net",
"ntg",
"av_vshale",
"av_phit",
"av_swt",
"av_bvhc",
"av_perm_am",
"av_perm_gm",
"av_perm_hm",
]
logger.debug(
f"Reservoir summary calculation completed with {len(ressum_df)} summary rows"
)
return ressum_df[cols]
[docs]
def flag_interval(vshale, phit, swt, cutoffs: dict):
"""Flag interval based on cutoffs.
Args:
vshale (np.ndarray or float): Vshale values.
phit (np.ndarray or float): Total porosity values.
swt (np.ndarray or float): Water saturation values.
cutoffs (dict): A dictionary with 'VSHALE', 'PHIT', and 'SWT' cutoffs.
Returns:
tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing the rock, reservoir, and pay flags.
"""
if len(cutoffs) != 3:
logger.error(
f"Invalid cutoffs format: expected 3 key-value pairs, got {len(cutoffs)}"
)
raise AssertionError(
"cutoffs must be 3 key-value pairs: {VSHALE: x, PHIT: y, SWT: z}."
)
logger.debug(
f"Flagging intervals with cutoffs: VSHALE={cutoffs.get('VSHALE')}, "
f"PHIT={cutoffs.get('PHIT')}, SWT={cutoffs.get('SWT')}"
)
rock_flag = np.where(vshale < cutoffs["VSHALE"], 1, 0)
reservoir_flag = np.where(rock_flag == 1, np.where(phit > cutoffs["PHIT"], 1, 0), 0)
pay_flag = np.where(reservoir_flag == 1, np.where(swt < cutoffs["SWT"], 1, 0), 0)
return rock_flag, reservoir_flag, pay_flag
[docs]
def volumetric_method(
area_bound: tuple,
thickness_bound: tuple,
porosity_bound: tuple,
water_saturation_bound: tuple,
volume_factor_bound: tuple,
recovery_factor_bound: tuple = None,
random_state=None,
):
"""Generate random samples for volumetric parameters based on specified distributions.
This function uses truncated normal distributions for area, thickness, porosity,
water saturation, and recovery factor, and a uniform distribution for the
volume factor.
Args:
area_bound (tuple): (min, max, mean, std) for area in acres.
thickness_bound (tuple): (min, max, mean, std) for thickness in feet.
porosity_bound (tuple): (min, max, mean, std) for porosity in fraction.
water_saturation_bound (tuple): (min, max, mean, std) for water saturation in fraction.
volume_factor_bound (tuple): (min, max) for the volume factor (unitless).
recovery_factor_bound (tuple, optional): (min, max, mean, std) for the recovery factor. Defaults to None.
random_state (int, optional): A seed for the random number generator. Defaults to None.
Returns:
tuple: A tuple of the generated random samples for (area, thickness, porosity, sw, bo, rf).
"""
logger.debug(
f"Starting volumetric method calculation with random_state={random_state}"
)
random.seed(random_state)
a_min_transformed = (area_bound[0] - area_bound[2]) / area_bound[3]
a_max_transformed = (area_bound[1] - area_bound[2]) / area_bound[3]
a = truncnorm.rvs(
a_min_transformed,
a_max_transformed,
loc=area_bound[2],
scale=area_bound[3],
random_state=random_state,
)
h_min_transformed = (thickness_bound[0] - thickness_bound[2]) / thickness_bound[3]
h_max_transformed = (thickness_bound[1] - thickness_bound[2]) / thickness_bound[3]
h = truncnorm.rvs(
h_min_transformed,
h_max_transformed,
loc=thickness_bound[2],
scale=thickness_bound[3],
random_state=random_state,
)
poro_min_transformed = (porosity_bound[0] - porosity_bound[2]) / porosity_bound[3]
poro_max_transformed = (porosity_bound[1] - porosity_bound[2]) / porosity_bound[3]
poro = truncnorm.rvs(
poro_min_transformed,
poro_max_transformed,
loc=porosity_bound[2],
scale=porosity_bound[3],
random_state=random_state,
)
sw_min_transformed = (
water_saturation_bound[0] - water_saturation_bound[2]
) / water_saturation_bound[3]
sw_max_transformed = (
water_saturation_bound[1] - water_saturation_bound[2]
) / water_saturation_bound[3]
sw = truncnorm.rvs(
sw_min_transformed,
sw_max_transformed,
loc=water_saturation_bound[2],
scale=water_saturation_bound[3],
random_state=random_state,
)
bo = random.uniform(volume_factor_bound[0], volume_factor_bound[1])
if recovery_factor_bound is not None:
rf_min_transformed = (
recovery_factor_bound[0] - recovery_factor_bound[2]
) / recovery_factor_bound[3]
rf_max_transformed = (
recovery_factor_bound[1] - recovery_factor_bound[2]
) / recovery_factor_bound[3]
rf = truncnorm.rvs(
rf_min_transformed,
rf_max_transformed,
loc=recovery_factor_bound[2],
scale=recovery_factor_bound[3],
random_state=random_state,
)
else:
rf = 1.0
logger.debug(
f"Volumetric parameters generated - Area: {a:.2f}, Thickness: {h:.2f}, "
f"Porosity: {poro:.3f}, Sw: {sw:.3f}, Bo: {bo:.3f}, RF: {rf:.3f}"
)
return a, h, poro, sw, bo, rf
[docs]
def mc_volumetric_method(
area_bound: tuple,
thickness_bound: tuple,
porosity_bound: tuple,
water_saturation_bound: tuple,
volume_factor_bound: tuple,
n_try=10000,
percentile=None,
):
"""Monte Carlo simulation for volumetric method.
This function performs a Monte Carlo simulation to estimate the distribution of
in-place hydrocarbon volumes and generates diagnostic plots.
Args:
area_bound (tuple): (min, max, mean, std) for area in acres.
thickness_bound (tuple): (min, max, mean, std) for thickness in feet.
porosity_bound (tuple): (min, max, mean, std) for porosity in fraction.
water_saturation_bound (tuple): (min, max, mean, std) for water saturation in fraction.
volume_factor_bound (tuple): (min, max) for the volume factor (unitless).
n_try (int, optional): Number of trials. Defaults to 10000.
percentile (list, optional): Percentiles to calculate and display. Defaults to [10, 50, 90].
Returns:
tuple: A tuple containing arrays of the generated volumes and input parameters.
"""
import ptitprince as pt
logger.info(f"Starting Monte Carlo simulation with {n_try} trials")
percentile = percentile or [10, 50, 90]
area = np.empty(0)
thickness = np.empty(0)
porosity = np.empty(0)
water_saturation = np.empty(0)
volume_factor = np.empty(0)
volumes = np.empty(0)
for _ in tqdm(range(n_try), desc="Monte Carlo simulation", unit="trials"):
a, h, poro, sw, bo, _ = volumetric_method(
area_bound=area_bound,
thickness_bound=thickness_bound,
porosity_bound=porosity_bound,
water_saturation_bound=water_saturation_bound,
volume_factor_bound=volume_factor_bound,
)
area = np.append(area, a)
thickness = np.append(thickness, h)
porosity = np.append(porosity, poro)
water_saturation = np.append(water_saturation, sw)
volume_factor = np.append(volume_factor, bo)
result = (43560 * 0.1781) * a * h * poro * (1 - sw) / bo
volumes = np.append(volumes, result * 1e-6)
logger.info(
f"Monte Carlo simulation completed. Volume statistics - "
f"Mean: {np.mean(volumes):.2f} MM bbl, Std: {np.std(volumes):.2f} MM bbl"
)
# Plot tornado chart
data = pd.DataFrame(
{
"Area": area,
"Thickness": thickness,
"Porosity": porosity,
"Water Saturation": water_saturation,
"Volume Factor": volume_factor,
"Volume": volumes,
}
)
fig, axs = plt.subplots(5, 1, figsize=(7, 11), constrained_layout=True)
pt.RainCloud(
data=data,
y="Area",
ax=axs[0],
orient="h",
bw=0.1,
width_viol=0.5,
alpha=0.6,
dodge=True,
)
axs[0].set_ylabel("Area (acre)")
pt.RainCloud(
data=data,
y="Thickness",
ax=axs[1],
orient="h",
bw=0.1,
width_viol=0.5,
alpha=0.6,
dodge=True,
)
axs[1].set_ylabel("Thickness (ft)")
pt.RainCloud(
data=data,
y="Porosity",
ax=axs[2],
orient="h",
bw=0.1,
width_viol=0.5,
alpha=0.6,
dodge=True,
)
axs[2].set_ylabel("Porosity (frac)")
pt.RainCloud(
data=data,
y="Water Saturation",
ax=axs[3],
orient="h",
bw=0.1,
width_viol=0.5,
alpha=0.6,
dodge=True,
)
axs[3].set_ylabel("Water Saturation (frac)")
pt.RainCloud(
data=data,
y="Volume Factor",
ax=axs[4],
orient="h",
bw=0.1,
width_viol=0.5,
alpha=0.6,
dodge=True,
)
axs[4].set_ylabel("Volume Factor")
fig.set_facecolor("aliceblue")
fig.show()
plt.figure(figsize=(12, 6))
a = np.hstack(volumes.tolist())
_ = plt.hist(a, density=True, bins="auto") # arguments are passed to np.histogram
plt.title(f"Volume Estimation ({n_try} samples)")
plt.xlabel("Volume (MM bbl)")
for i, pct in enumerate(np.percentile(volumes, percentile)):
plt.axvline(
x=pct,
color="r",
linestyle="dashed",
label=f"P{percentile[i]}: {round(pct)} MM bbl",
)
plt.legend(bbox_to_anchor=(1.04, 1), loc="upper left")
return volumes, area, thickness, porosity, water_saturation, volume_factor
[docs]
def calculate_volume(
x, area, thickness, porosity, water_saturation, volume_factor, recovery_factor=1
):
"""Calculate volume using the volumetric method.
This function is a helper for sensitivity analysis, applying a perturbation `x`
to the calculation.
Args:
x (float): A perturbation factor.
area (float): Area in acres.
thickness (float): Thickness in feet.
porosity (float): Porosity in fraction.
water_saturation (float): Water saturation in fraction.
volume_factor (float): Volume factor.
recovery_factor (float, optional): Recovery factor in fraction. Defaults to 1.
Returns:
float: Estimated volume in million barrels (MMbbl).
"""
return (
x
* (43560 * 0.1781)
* area
* thickness
* porosity
* (1 - water_saturation)
/ (volume_factor)
* recovery_factor
* 1e-6
)
[docs]
def sensitivity_analysis(
area_bound: tuple,
thickness_bound: tuple,
porosity_bound: tuple,
water_saturation_bound: tuple,
volume_factor_bound: tuple,
):
"""Sensitivity analysis for volumetric method.
This function performs a Sobol sensitivity analysis to determine the influence
of each input parameter on the final volume calculation and displays the
results as tornado charts (both volume-based and sensitivity indices).
Args:
area_bound (tuple): (min, max) for area.
thickness_bound (tuple): (min, max) for thickness.
porosity_bound (tuple): (min, max) for porosity.
water_saturation_bound (tuple): (min, max) for water saturation.
volume_factor_bound (tuple): (min, max) for the volume factor.
"""
from SALib.analyze.sobol import analyze
from SALib.sample.saltelli import sample
logger.info("Starting sensitivity analysis for volumetric method")
# Define the model inputs
problem = {
"num_vars": 5,
"names": ["area", "thickness", "porosity", "water_saturation", "volume_factor"],
"bounds": [
[area_bound[0], area_bound[1]],
[thickness_bound[0], thickness_bound[1]],
[porosity_bound[0], porosity_bound[1]],
[water_saturation_bound[0], water_saturation_bound[1]],
[volume_factor_bound[0], volume_factor_bound[1]],
],
}
logger.debug(
f"Problem defined with {problem['num_vars']} variables and {2**10} samples"
)
# Generate samples
param_values = sample(problem, 2**10, calc_second_order=False)
# Evaluate model at each sample point
x = np.linspace(-1, 1, 100)
y = np.array([calculate_volume(x, *params) for params in param_values])
# Perform Sobol analysis
Si = analyze(problem, y[:, 0], calc_second_order=False)
# Extract the sensitivity indices
S1 = Si["S1"]
S1_conf = Si["S1_conf"]
names = problem["names"]
logger.info(
f"Sensitivity analysis completed. Top parameter: {names[np.argmax(S1)]} "
f"(S1={S1[np.argmax(S1)]:.3f})"
)
# ===== VOLUME-BASED TORNADO CHART (PRIMARY - MORE INTUITIVE) =====
# Calculate baseline volume at mean parameter values
baseline_params = {
name: (bound[0] + bound[1]) / 2
for name, bound in zip(problem["names"], problem["bounds"], strict=True)
}
baseline_volume = calculate_volume(1, **baseline_params)
# Calculate volume impacts for each parameter
volume_impacts = []
for param_name, (min_val, max_val) in zip(
problem["names"], problem["bounds"], strict=True
):
params_at_min = baseline_params.copy()
params_at_min[param_name] = min_val
vol_min = calculate_volume(1, **params_at_min)
params_at_max = baseline_params.copy()
params_at_max[param_name] = max_val
vol_max = calculate_volume(1, **params_at_max)
volume_impacts.append(
{
"name": param_name,
"vol_min": vol_min,
"vol_max": vol_max,
"low_impact": vol_min - baseline_volume,
"high_impact": vol_max - baseline_volume,
"range": abs(vol_max - vol_min),
}
)
# Sort by impact range for visualization
volume_impacts_sorted = sorted(volume_impacts, key=lambda x: x["range"])
# Create volume-based tornado chart
plt.figure(figsize=(12, 7))
y_pos = np.arange(len(volume_impacts_sorted))
param_names_sorted = [imp["name"] for imp in volume_impacts_sorted]
low_impacts = np.array([imp["low_impact"] for imp in volume_impacts_sorted])
high_impacts = np.array([imp["high_impact"] for imp in volume_impacts_sorted])
plt.barh(
y_pos,
low_impacts,
height=0.6,
align="center",
alpha=0.8,
color="steelblue",
edgecolor="black",
linewidth=1.5,
label="At Minimum",
)
plt.barh(
y_pos,
high_impacts,
height=0.6,
align="center",
alpha=0.8,
color="steelblue",
edgecolor="black",
linewidth=1.5,
label="At Maximum",
)
# Add baseline reference line
plt.axvline(x=0, color="red", linestyle="--", linewidth=2.5, alpha=0.8)
plt.text(
0,
len(volume_impacts_sorted) - 0.5,
f"Baseline: {baseline_volume:.1f} MMbbl",
ha="center",
fontsize=10,
fontweight="bold",
color="red",
bbox={"boxstyle": "round", "facecolor": "yellow", "alpha": 0.3},
)
# Add value labels on bars
for i, (low, high) in enumerate(zip(low_impacts, high_impacts, strict=True)):
if low < 0:
plt.text(
low - 5,
i,
f"{low:.0f}",
ha="right",
va="center",
fontweight="bold",
fontsize=9,
)
else:
plt.text(
low + 5,
i,
f"{low:.0f}",
ha="left",
va="center",
fontweight="bold",
fontsize=9,
)
if high > 0:
plt.text(
high + 5,
i,
f"{high:+.0f}",
ha="left",
va="center",
fontweight="bold",
fontsize=9,
)
else:
plt.text(
high - 5,
i,
f"{high:.0f}",
ha="right",
va="center",
fontweight="bold",
fontsize=9,
)
plt.yticks(y_pos, [p.replace("_", " ").title() for p in param_names_sorted])
plt.xlabel("Volume Change from Baseline (MMbbl)", fontsize=12, fontweight="bold")
plt.title(
"Tornado Chart - STOIIP Volume Impacts\n(Practical View: Shows Actual Volume Ranges)",
fontsize=13,
fontweight="bold",
)
plt.grid(True, alpha=0.3, axis="x")
plt.legend(loc="lower right")
plt.tight_layout()
plt.show()
# ===== SENSITIVITY INDEX TORNADO CHART (SECONDARY - TECHNICAL REFERENCE) =====
# Sort by sensitivity index for comparison
sorted_indices = np.argsort(S1)
sorted_S1 = S1[sorted_indices]
sorted_S1_conf = S1_conf[sorted_indices]
sorted_names = [names[i] for i in sorted_indices]
plt.figure(figsize=(10, 6))
plt.barh(
range(len(sorted_S1)),
sorted_S1,
xerr=sorted_S1_conf,
align="center",
alpha=0.7,
color="darkorange",
ecolor="black",
capsize=5,
)
plt.yticks(
range(len(sorted_S1)), [n.replace("_", " ").title() for n in sorted_names]
)
plt.xlabel("First-order Sensitivity Index (S1)", fontsize=12, fontweight="bold")
plt.title(
"Tornado Chart - Sensitivity Indices\n(Technical Reference: Normalized Importance)",
fontsize=13,
fontweight="bold",
)
plt.grid(True, alpha=0.3, axis="x")
plt.tight_layout()
plt.show()
[docs]
def cutoffs_analysis(df, percentile=95):
"""Perform sensitivity analysis on Vshale, PHIT and SWT cutoffs against hydrocarbon pore volume.
Args:
df (pd.DataFrame): A DataFrame containing 'VSHALE', 'PHIT', and 'SWT' columns.
percentile (int, optional): The percentile to use for determining the optimal cutoff point
from the cumulative HCPV curve. Defaults to 95.
Returns:
tuple[float, float, float]: A tuple containing the optimal cutoffs for VSHALE, PHIT, and SWT.
"""
logger.debug("Starting cutoffs sensitivity analysis")
# Calculate distances from each point to line between endpoints
def find_percentile_point(x, y, percentile):
# Sort values by y
sorted_indices = y.sort_values().index
sorted_y = y.iloc[sorted_indices].round(3)
sorted_x = x.iloc[sorted_indices].round(3)
# Remove duplicates and find index closest to specified percentile
unique_y = sorted_y.drop_duplicates()
target_value = np.percentile(unique_y, percentile)
idx = (np.abs(sorted_y - target_value)).argmin()
# Return x value at percentile
return sorted_x.iloc[idx]
# Define parameter ranges
vsh_range = np.linspace(1.0, 0.0, 100)
phit_range = np.linspace(0.0, 1.0, 100)
swt_range = np.linspace(1.0, 0.0, 100)
# Calculate total hydrocarbon pore volume
df["HCPV"] = df["PHIT"] * (1 - df["SWT"])
# Sensitivity analysis for VSHALE
results = []
phi = 0
sw = 1
for vsh in tqdm(vsh_range, desc="Analyzing cutoffs"):
cutoffs = {"VSHALE": vsh, "PHIT": phi, "SWT": sw}
rock_flag, _, _ = flag_interval(df["VSHALE"], df["PHIT"], df["SWT"], cutoffs)
# Calculate total hydrocarbon pore volume
hcpv = np.sum(df.loc[rock_flag == 1, "HCPV"])
results.append(
{
"flag": "rock",
"vshale_cut": vsh,
"phit_cut": phi,
"swt_cut": sw,
"HCPV": hcpv,
}
)
temp_df = pd.DataFrame(results).reset_index(drop=True)
temp_df["HCPV"] = minmax_scale(temp_df["HCPV"])
optimal_vsh_cut = find_percentile_point(
temp_df["vshale_cut"], temp_df["HCPV"], percentile
)
# Sensitivity analysis for PHIT
vsh = optimal_vsh_cut
sw = 1
for phi in phit_range:
cutoffs = {"VSHALE": vsh, "PHIT": phi, "SWT": sw}
_, res_flag, _ = flag_interval(df["VSHALE"], df["PHIT"], df["SWT"], cutoffs)
# Calculate total hydrocarbon pore volume
hcpv = np.sum(df.loc[res_flag == 1, "HCPV"])
results.append(
{
"flag": "res",
"vshale_cut": vsh,
"phit_cut": phi,
"swt_cut": sw,
"HCPV": hcpv,
}
)
temp_df = pd.DataFrame(results)
temp_df = temp_df[temp_df["flag"] == "res"].reset_index(drop=True)
temp_df["HCPV"] = minmax_scale(temp_df["HCPV"])
optimal_phit_cut = find_percentile_point(
temp_df["phit_cut"], temp_df["HCPV"], percentile
)
# Sensitivity analysis for SWT
vsh = optimal_vsh_cut
phi = optimal_phit_cut
for sw in swt_range:
cutoffs = {"VSHALE": vsh, "PHIT": phi, "SWT": sw}
_, _, pay_flag = flag_interval(df["VSHALE"], df["PHIT"], df["SWT"], cutoffs)
# Calculate total hydrocarbon pore volume
hcpv = np.sum(df.loc[pay_flag == 1, "HCPV"])
results.append(
{
"flag": "pay",
"vshale_cut": vsh,
"phit_cut": phi,
"swt_cut": sw,
"HCPV": hcpv,
}
)
# Convert results to DataFrame
results_df = pd.DataFrame(results)
temp_df = results_df[results_df["flag"] == "pay"].reset_index(drop=True)
temp_df["HCPV"] = minmax_scale(temp_df["HCPV"])
optimal_sw_cut = find_percentile_point(
temp_df["swt_cut"], temp_df["HCPV"], percentile
)
logger.info(
f"Optimal cutoffs found - Vshale: {optimal_vsh_cut:.3f}, "
f"PHIT: {optimal_phit_cut:.3f}, SWT: {optimal_sw_cut:.3f}"
)
# Plot sensitivity crossplots
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for ax, (flag, data) in zip(axes, results_df.groupby("flag"), strict=True):
param = (
"vshale_cut"
if flag == "rock"
else "phit_cut"
if flag == "res"
else "swt_cut"
)
data["HCPV"] = minmax_scale(data["HCPV"])
ax.plot(data[param], data["HCPV"], "b-", label="HCPV")
cut_off = (
optimal_vsh_cut
if flag == "rock"
else optimal_phit_cut
if flag == "res"
else optimal_sw_cut
)
ax.axvline(
x=cut_off,
color="r",
linestyle="--",
label=f"Optimal Cutoff for {param}={round(cut_off, 3)}",
)
ax.set_xlabel(param)
ax.set_ylabel("HCPV")
ax.set_title("HCPV vs " + param + " with Optimal Cutoff Point")
ax.grid(True)
ax.legend()
plt.tight_layout()
plt.show()
return optimal_vsh_cut, optimal_phit_cut, optimal_sw_cut