import importlib.util
import os
from pathlib import Path
import mlflow
import mlflow.sklearn as mlflow_sklearn
import pandas as pd
from mlflow.models.signature import infer_signature
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from quick_pp import logger
from quick_pp.machine_learning.config import MODELLING_CONFIG
from quick_pp.machine_learning.feature_engineering import generate_fe_features
from quick_pp.machine_learning.utils import run_mlflow_server
# Check if config.py exists in the root directory and update MODELLING_CONFIG if found
root_config_path = Path(os.getcwd(), "config.py")
if root_config_path.exists():
try:
spec = importlib.util.spec_from_file_location("root_config", root_config_path)
root_config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(root_config)
if hasattr(root_config, "MODELLING_CONFIG"):
logger.info("Updating MODELLING_CONFIG from root config.py")
MODELLING_CONFIG.clear()
MODELLING_CONFIG.update(root_config.MODELLING_CONFIG)
else:
logger.warning(
"MODELLING_CONFIG not found in root config.py, using default MODELLING_CONFIG"
)
except Exception as e:
logger.warning(f"Could not import MODELLING_CONFIG from root config.py: {e}")
[docs]
def load_data(hash: str):
"""Load data from the specified directory using a hash to identify the file.
Args:
hash (str): A unique hash string contained within the target Parquet filename.
Raises:
FileNotFoundError: If no file is found with the specified hash.
Returns:
pd.DataFrame: The loaded well log data as a DataFrame.
"""
data_dir = Path("data/input/")
matching_files = list(data_dir.glob(f"*{hash}*.parquet"))
if not matching_files:
raise FileNotFoundError(f"No file found in {data_dir} containing hash '{hash}'")
path = matching_files[0]
return pd.read_parquet(path)
[docs]
def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
"""Preprocess the DataFrame by generating features and cleaning the data.
Args:
df (pd.DataFrame): The raw input DataFrame.
Returns:
pd.DataFrame: The preprocessed DataFrame with engineered features and
duplicates removed.
"""
df = generate_fe_features(df)
# Drop duplicates based on WELL_NAME and DEPTH, including duplicated columns
df = df.drop_duplicates(subset=["WELL_NAME", "DEPTH"])
df = df.loc[:, ~df.columns.duplicated()]
return df
[docs]
def split_data(
df: pd.DataFrame,
target_column: list[str],
features: list[str],
test_size=0.2,
random_state=42,
) -> list:
"""Split the data into training and testing sets.
Args:
df (pd.DataFrame): The DataFrame to be split.
target_column (list[str]): List of target column names.
features (list[str]): List of feature column names.
test_size (float, optional): Proportion of the dataset to include in the test split. Defaults to 0.2.
random_state (int, optional): Random seed for reproducibility. Defaults to 42.
Returns:
list: A list containing `[X_train, X_test, y_train, y_test]`.
"""
# Drop rows with NaN in target or features
return_df = df.dropna(subset=target_column + features)
X = return_df[features].astype("float")
y = return_df[target_column].astype("float")
return train_test_split(X, y, test_size=test_size, random_state=random_state)
[docs]
def train_model(alg, X_train: pd.DataFrame, y_train: pd.DataFrame):
"""Train the model using the specified algorithm.
Args:
alg (callable): The scikit-learn model class to instantiate.
X_train (pd.DataFrame): The training feature DataFrame.
y_train (pd.DataFrame): The training target DataFrame.
Returns:
object: The trained model instance.
"""
logger.info(f"Training model: {getattr(alg, '__name__', str(alg))}")
model = alg(random_state=42)
model.fit(X_train, y_train)
logger.debug("Model training complete")
return model
[docs]
def evaluate_model(model, X_test: pd.DataFrame, y_test: pd.DataFrame) -> dict:
"""Evaluate the model using the test data.
Args:
model (object): The trained model to evaluate.
X_test (pd.DataFrame): The testing feature DataFrame.
y_test (pd.DataFrame): The testing target DataFrame.
Returns:
dict: A dictionary of evaluation metrics (e.g., 'f1_score', 'r2_score').
"""
logger.info(f"Evaluating model: {type(model).__name__}")
y_pred = model.predict(X_test)
# Check model type
if hasattr(model, "predict_proba"):
logger.debug("Classification metrics calculated")
return {
"f1_score": f1_score(y_test, y_pred),
"accuracy": accuracy_score(y_test, y_pred),
}
else:
logger.debug("Regression metrics calculated")
return {
"r2_score": r2_score(y_test, y_pred),
"mean_absolute_error": mean_absolute_error(y_test, y_pred),
}
# 6. Train pipeline
[docs]
def train_pipeline(model_config: str, data_hash: str, env: str = "local"):
"""Execute the end-to-end model training pipeline.
This function automates training, evaluating, and logging multiple models as defined
in the configuration, leveraging MLflow for experiment tracking and model management.
Args:
model_config (str): The key for the model configuration (e.g., 'clastic').
data_hash (str): The unique hash identifying the data file.
env (str, optional): The MLflow environment ('local' or 'remote'). Defaults to 'local'.
Raises:
TypeError: If the targets or features are not lists of strings.
"""
logger.info(
f"Starting train_pipeline with model_config={model_config}, data_hash={data_hash}, env={env}"
)
# Check if the model_config exists in MODELLING_CONFIG
if model_config not in MODELLING_CONFIG:
error_msg = (
f"Model configuration '{model_config}' not found in MODELLING_CONFIG"
)
logger.error(error_msg)
raise ValueError(error_msg)
# Run MLflow server
run_mlflow_server(env)
for model_key, model_values in MODELLING_CONFIG[model_config].items():
alg = model_values["alg"]
targets = model_values["targets"]
features = model_values["features"]
logger.info(f"Processing model: {model_key}")
if not (isinstance(targets, list) and all(isinstance(t, str) for t in targets)):
logger.error(f"Targets must be a list of strings, got {targets}")
raise TypeError(f"Targets must be a list of strings, got {targets}")
if not (
isinstance(features, list) and all(isinstance(f, str) for f in features)
):
logger.error(f"Features must be a list of strings, got {features}")
raise TypeError(f"Features must be a list of strings, got {features}")
df = load_data(data_hash)
df = preprocess_data(df)
# Skip if targets or features are not in the DataFrame
if not all(col in df.columns for col in targets + features):
missing_cols = [col for col in targets + features if col not in df.columns]
logger.warning(
f"Skipping model {model_key} due to missing columns: {missing_cols}"
)
continue
X_train, X_test, y_train, y_test = split_data(df, targets, features)
mlflow_dir = Path("./mlruns")
os.makedirs(mlflow_dir, exist_ok=True)
mlflow.set_experiment(model_config)
with mlflow.start_run(
run_name=model_key, description=str(model_values["description"])
):
# Train model
model = train_model(alg, X_train, y_train)
# Log metrics
metrics_dict = evaluate_model(model, X_test, y_test)
for metric_name, metric_value in metrics_dict.items():
mlflow.log_metric(metric_name, float(metric_value))
logger.info(f"Logged metric: {metric_name}={metric_value}")
# Log model
reg_model_name = f"{model_config}_{model_key}_{data_hash}"
signature = infer_signature(X_train, y_train)
mlflow_sklearn.log_model(
model,
"model",
signature=signature,
input_example=X_test.sample(5),
registered_model_name=reg_model_name,
)
logger.info(f"Model logged and registered as: {reg_model_name}")