from collections import defaultdict import traceback from huggingface_hub import hf_hub_download import pandas as pd from sklearn.metrics import roc_auc_score, roc_curve from typing import Any, Dict import numpy as np def check_if_score_ok(df): if df["score"].min() >= 0.5 and df["score"].max() <= 1.0: print("assuming max prob is reported... recomputing") ## assume in this case they are reporting max P(real), P(generated) pred_generated = df["submission_pred"] == "generated" pred_real = df["submission_pred"] == "real" df.loc[pred_real, "score"] = 1.0 - df.loc[pred_real, "score"] not_predicted = ~(pred_generated | pred_real) if not_predicted.any(): df.loc[not_predicted, "score"] = np.random.rand(not_predicted.sum()) return df def compute_auc(df: pd.DataFrame) -> float: try: isna = df["score"].isna() ## All nans if isna.all(): return float("nan") df = df.loc[~isna].copy() ytrue = df["pred"] == "generated" ## Only one class if ytrue.all() or (~ytrue).all(): return float("nan") df = check_if_score_ok(df) return roc_auc_score(ytrue, df["score"]) except Exception as e: print(f"AUC exception: {e}") # traceback.print_exc() return float("nan") def compute_roc_curve(df: pd.DataFrame, keep_every: int = 10) -> Dict[Any, Any]: try: isna = df["score"].isna() ## All nans if isna.all(): return {"fpr": [], "tpr": [], "threshold": []} df = df.loc[~isna] df = check_if_score_ok(df) fpr, tpr, threshold = roc_curve(df["pred"] == "generated", df["score"]) if len(fpr) < keep_every: return {"fpr": fpr.tolist(), "tpr": tpr.tolist(), "threshold": threshold.tolist()} # Sample every keep_every return { "fpr": fpr.tolist()[::keep_every], "tpr": tpr.tolist()[::keep_every], "threshold": threshold.tolist()[::keep_every], } except Exception as e: print(f"ROC exception: {e}") return {"fpr": [], "tpr": [], "threshold": []} def compute_metrics(df: pd.DataFrame, score_name: str, use_all: bool) -> Dict[Any, Any]: metrics = defaultdict(dict) ## Accuracies df["correct"] = df["pred"] == df["submission_pred"] metrics["generated_accuracy"] = float(df.query("pred=='generated'")["correct"].mean()) metrics["real_accuracy"] = float(df.query("pred=='real'")["correct"].mean()) metrics["balanced_accuracy"] = (metrics["generated_accuracy"] + metrics["real_accuracy"]) / 2 ## Other if "score" in df.columns: metrics["auc"] = compute_auc(df=df.copy()) metrics["roc"] = compute_roc_curve(df=df.copy()) metrics["fail_rate"] = float(df["score"].isna().mean()) if use_all: ## Split by sources df["score_name"] = df["pred"] + "_" + df[score_name] scores_by_source = df.copy().groupby(["score_name"])["correct"].mean() metrics.update(scores_by_source.to_dict()) ## Compute conditional AUC source_pred = df[[score_name, "pred"]].drop_duplicates().values all_reals = df["pred"] == "real" all_generated = df["pred"] == "generated" for s, pred in source_pred: source_mask = df[score_name] == s if pred == "generated": mask = all_reals | source_mask elif pred == "real": mask = all_generated | source_mask else: raise ValueError(f"{pred} not allowed") metrics[f"{pred}_conditional_auc_{s}"] = compute_auc(df.loc[mask]) else: df["score_name"] = df["pred"] + "_" + df[score_name] scores_by_ = df.copy().groupby(["score_name"])["correct"].mean() metrics.update(scores_by_.to_dict()) for s in df[score_name].unique(): mask = df[score_name] == s metrics[f"conditional_auc_{s}"] = compute_auc(df.loc[mask]) return metrics def _metric( solution_df: pd.DataFrame, submission_df: pd.DataFrame, score_name: str = "score", use_all: bool = True ) -> Dict[Any, Any]: """ Calculates prediction accuracy against the ground truth. Args: solution_df (pd.DataFrame): Ground truth data. submission_df (pd.DataFrame): Predicted data. Returns: dict: Accuracy scores, structure depends on `mode` and `full`. """ ## Allocate space evaluation = defaultdict(dict) solution_df, submission_df = solution_df.copy(), submission_df.copy() ## Ensure alignment of keys and group relevant columns solution_df["submission_pred"] = solution_df.join(submission_df, lsuffix="_solution", rsuffix="_submission")[ "pred_submission" ].values if "score" in submission_df.columns: solution_df["score"] = solution_df.join(submission_df, lsuffix="_solution", rsuffix="_submission")[ "score" ].values ## Save data split evaluation["public_score"]["proportion"] = len(solution_df.query(f"split=='public'").copy()) / len(solution_df) evaluation["private_score"]["proportion"] = 1.0 evaluation["private_only_score"]["proportion"] = len(solution_df.query(f"split=='private'").copy()) / len(solution_df) ## Public, private, and private_only split public_df = solution_df.query("split=='public'").copy() private_df = solution_df.copy() private_only_df = solution_df.query("split=='private'").copy() ## Loop for split, dataframe in zip(["public", "private", "private_only"], [public_df, private_df, private_only_df]): metrics = compute_metrics( df=dataframe.copy(), score_name=score_name if split == "public" else f"{score_name}_og", use_all=use_all ) evaluation[f"{split}_score"] = metrics return evaluation def compute(params): solution_file = hf_hub_download( repo_id=params.competition_id, filename="solution.csv", token=params.token, repo_type="dataset", ) solution_df = pd.read_csv(solution_file).set_index(params.submission_id_col) submission_filename = f"submissions/{params.team_id}-{params.submission_id}.csv" submission_file = hf_hub_download( repo_id=params.competition_id, filename=submission_filename, token=params.token, repo_type="dataset", ) submission_df = pd.read_csv(submission_file).set_index(params.submission_id_col) return _metric(solution_df, submission_df)