|
|
from collections import defaultdict |
|
|
import traceback |
|
|
from huggingface_hub import hf_hub_download |
|
|
import pandas as pd |
|
|
from sklearn.metrics import roc_auc_score, roc_curve |
|
|
from typing import Any, Dict |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
def check_if_score_ok(df): |
|
|
if df["score"].min() >= 0.5 and df["score"].max() <= 1.0: |
|
|
print("assuming max prob is reported... recomputing") |
|
|
|
|
|
pred_generated = df["submission_pred"] == "generated" |
|
|
pred_real = df["submission_pred"] == "real" |
|
|
|
|
|
df.loc[pred_real, "score"] = 1.0 - df.loc[pred_real, "score"] |
|
|
not_predicted = ~(pred_generated | pred_real) |
|
|
if not_predicted.any(): |
|
|
df.loc[not_predicted, "score"] = np.random.rand(not_predicted.sum()) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def compute_auc(df: pd.DataFrame) -> float: |
|
|
try: |
|
|
isna = df["score"].isna() |
|
|
|
|
|
|
|
|
if isna.all(): |
|
|
return float("nan") |
|
|
|
|
|
df = df.loc[~isna].copy() |
|
|
|
|
|
ytrue = df["pred"] == "generated" |
|
|
|
|
|
|
|
|
if ytrue.all() or (~ytrue).all(): |
|
|
return float("nan") |
|
|
|
|
|
df = check_if_score_ok(df) |
|
|
|
|
|
return roc_auc_score(ytrue, df["score"]) |
|
|
except Exception as e: |
|
|
print(f"AUC exception: {e}") |
|
|
|
|
|
return float("nan") |
|
|
|
|
|
|
|
|
def compute_roc_curve(df: pd.DataFrame, keep_every: int = 10) -> Dict[Any, Any]: |
|
|
try: |
|
|
isna = df["score"].isna() |
|
|
|
|
|
|
|
|
if isna.all(): |
|
|
return {"fpr": [], "tpr": [], "threshold": []} |
|
|
|
|
|
df = df.loc[~isna] |
|
|
|
|
|
df = check_if_score_ok(df) |
|
|
|
|
|
fpr, tpr, threshold = roc_curve(df["pred"] == "generated", df["score"]) |
|
|
if len(fpr) < keep_every: |
|
|
return {"fpr": fpr.tolist(), "tpr": tpr.tolist(), "threshold": threshold.tolist()} |
|
|
|
|
|
|
|
|
return { |
|
|
"fpr": fpr.tolist()[::keep_every], |
|
|
"tpr": tpr.tolist()[::keep_every], |
|
|
"threshold": threshold.tolist()[::keep_every], |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"ROC exception: {e}") |
|
|
return {"fpr": [], "tpr": [], "threshold": []} |
|
|
|
|
|
|
|
|
def compute_metrics(df: pd.DataFrame, score_name: str, use_all: bool) -> Dict[Any, Any]: |
|
|
metrics = defaultdict(dict) |
|
|
|
|
|
|
|
|
df["correct"] = df["pred"] == df["submission_pred"] |
|
|
metrics["generated_accuracy"] = float(df.query("pred=='generated'")["correct"].mean()) |
|
|
metrics["real_accuracy"] = float(df.query("pred=='real'")["correct"].mean()) |
|
|
metrics["balanced_accuracy"] = (metrics["generated_accuracy"] + metrics["real_accuracy"]) / 2 |
|
|
|
|
|
|
|
|
if "score" in df.columns: |
|
|
metrics["auc"] = compute_auc(df=df.copy()) |
|
|
metrics["roc"] = compute_roc_curve(df=df.copy()) |
|
|
metrics["fail_rate"] = float(df["score"].isna().mean()) |
|
|
|
|
|
if use_all: |
|
|
|
|
|
df["score_name"] = df["pred"] + "_" + df[score_name] |
|
|
scores_by_source = df.copy().groupby(["score_name"])["correct"].mean() |
|
|
metrics.update(scores_by_source.to_dict()) |
|
|
|
|
|
source_pred = df[[score_name, "pred"]].drop_duplicates().values |
|
|
all_reals = df["pred"] == "real" |
|
|
all_generated = df["pred"] == "generated" |
|
|
for s, pred in source_pred: |
|
|
source_mask = df[score_name] == s |
|
|
if pred == "generated": |
|
|
mask = all_reals | source_mask |
|
|
elif pred == "real": |
|
|
mask = all_generated | source_mask |
|
|
else: |
|
|
raise ValueError(f"{pred} not allowed") |
|
|
metrics[f"{pred}_conditional_auc_{s}"] = compute_auc(df.loc[mask]) |
|
|
else: |
|
|
df["score_name"] = df["pred"] + "_" + df[score_name] |
|
|
scores_by_ = df.copy().groupby(["score_name"])["correct"].mean() |
|
|
metrics.update(scores_by_.to_dict()) |
|
|
for s in df[score_name].unique(): |
|
|
mask = df[score_name] == s |
|
|
metrics[f"conditional_auc_{s}"] = compute_auc(df.loc[mask]) |
|
|
return metrics |
|
|
|
|
|
|
|
|
def _metric( |
|
|
solution_df: pd.DataFrame, submission_df: pd.DataFrame, score_name: str = "score", use_all: bool = True |
|
|
) -> Dict[Any, Any]: |
|
|
""" |
|
|
Calculates prediction accuracy against the ground truth. |
|
|
|
|
|
Args: |
|
|
solution_df (pd.DataFrame): Ground truth data. |
|
|
submission_df (pd.DataFrame): Predicted data. |
|
|
|
|
|
Returns: |
|
|
dict: Accuracy scores, structure depends on `mode` and `full`. |
|
|
""" |
|
|
|
|
|
|
|
|
evaluation = defaultdict(dict) |
|
|
solution_df, submission_df = solution_df.copy(), submission_df.copy() |
|
|
|
|
|
|
|
|
solution_df["submission_pred"] = solution_df.join(submission_df, lsuffix="_solution", rsuffix="_submission")[ |
|
|
"pred_submission" |
|
|
].values |
|
|
if "score" in submission_df.columns: |
|
|
solution_df["score"] = solution_df.join(submission_df, lsuffix="_solution", rsuffix="_submission")[ |
|
|
"score" |
|
|
].values |
|
|
|
|
|
|
|
|
evaluation["public_score"]["proportion"] = len(solution_df.query(f"split=='public'").copy()) / len(solution_df) |
|
|
evaluation["private_score"]["proportion"] = 1.0 |
|
|
evaluation["private_only_score"]["proportion"] = len(solution_df.query(f"split=='private'").copy()) / len(solution_df) |
|
|
|
|
|
|
|
|
public_df = solution_df.query("split=='public'").copy() |
|
|
private_df = solution_df.copy() |
|
|
private_only_df = solution_df.query("split=='private'").copy() |
|
|
|
|
|
|
|
|
for split, dataframe in zip(["public", "private", "private_only"], [public_df, private_df, private_only_df]): |
|
|
metrics = compute_metrics( |
|
|
df=dataframe.copy(), score_name=score_name if split == "public" else f"{score_name}_og", use_all=use_all |
|
|
) |
|
|
evaluation[f"{split}_score"] = metrics |
|
|
return evaluation |
|
|
|
|
|
|
|
|
def compute(params): |
|
|
solution_file = hf_hub_download( |
|
|
repo_id=params.competition_id, |
|
|
filename="solution.csv", |
|
|
token=params.token, |
|
|
repo_type="dataset", |
|
|
) |
|
|
|
|
|
solution_df = pd.read_csv(solution_file).set_index(params.submission_id_col) |
|
|
|
|
|
submission_filename = f"submissions/{params.team_id}-{params.submission_id}.csv" |
|
|
submission_file = hf_hub_download( |
|
|
repo_id=params.competition_id, |
|
|
filename=submission_filename, |
|
|
token=params.token, |
|
|
repo_type="dataset", |
|
|
) |
|
|
|
|
|
submission_df = pd.read_csv(submission_file).set_index(params.submission_id_col) |
|
|
|
|
|
return _metric(solution_df, submission_df) |
|
|
|