kt-test-account's picture
chages!
1918157
import streamlit as st
from pathlib import Path
import pandas as pd
import altair as alt
import subprocess
import os
import numpy as np
import datetime
## Save results path
COMP_CACHE = Path("competition_cache/safe-challenge")
results_path = Path("competition_cache/cached_results")
TASKS = {
"video-challenge-pilot-config": ["source"],
"video-challenge-task-1-config": ["source"],
"video-challenge-task-2-config": ["source", "category"],
}
valid_splits = ["public", "private", "private_only"]
#####################################################################
## Data loading ##
#####################################################################
## Data loading
def get_max_score(group: pd.DataFrame, metric: str, use_selection: bool = True) -> pd.DataFrame:
if use_selection:
if group["selected"].any():
subset = group[group["selected"]]
else:
subset = group
else:
subset = group
max_idx = subset[metric].idxmax()
return group.loc[max_idx]
def select_rows(df, metric: str = "balanced_accuracy"):
def select(group):
if group["selected"].any():
return group[group["selected"]].loc[group[group["selected"]][metric].idxmax()]
else:
return group.loc[group[f"{metric}_public"].idxmax()]
return df.groupby("team", group_keys=False).apply(select)
@st.cache_data
def load_results(task_key, best_only, metric="balanced_accuracy",check_discrepancies = False):
to_return = {}
for split in valid_splits:
for score in TASKS.get(task_key):
file_path = f"{results_path}/{task_key}_{score}_{split}_score.csv"
if os.path.exists(file_path):
df = pd.read_csv(file_path)
public_df = pd.read_csv(f"{results_path}/{task_key}_{score}_public_score.csv")
if not best_only:
to_return[f"{split}_{score}_score"] = df
else:
if split == "public":
df = df.sort_values(["team", metric], ascending=False).reset_index(drop=True)
selected_max = (
df.copy()
.groupby("team", group_keys=False)
.apply(get_max_score, metric=metric, use_selection=True)
.sort_values([metric], ascending=False)
.set_index("team")
)
df = (
df.copy()
.groupby("team", group_keys=False)
.apply(get_max_score, metric=metric, use_selection=False)
.sort_values([metric], ascending=False)
.set_index("team")
)
if check_discrepancies:
to_return[f"desc_{split}_{score}_score"] = df[metric] - selected_max[metric]
else:
public_df = (
public_df.sort_values(["team", metric], ascending=False)
.reset_index(drop=True)
.set_index("submission_id")[metric]
)
tmp = df.set_index("submission_id").copy()
tmp = tmp.join(public_df, on=["submission_id"], rsuffix="_public")
tmp = tmp.reset_index()
df = select_rows(tmp,metric = metric)
df = df.sort_values([metric], ascending=False).set_index("team")
to_return[f"{split}_{score}_score"] = df
return to_return
@st.cache_data
def load_submission():
out = []
for task in TASKS:
data = pd.read_csv(f"{results_path}/{task}_source_submissions.csv")
data["task"] = task
out.append(data)
return pd.concat(out, ignore_index=True)
def get_updated_time(file="competition_cache/updated.txt"):
if os.path.exists(file):
return open(file).read()
else:
return "no time file found"
@st.cache_data
def get_volume():
subs = pd.concat(
[pd.read_csv(f"{results_path}/{task}_source_submissions.csv") for task in TASKS],
ignore_index=True,
)
subs["datetime"] = pd.DatetimeIndex(subs["datetime"])
subs["date"] = subs["datetime"].dt.date
subs = subs.groupby(["date", "status_reason"]).size().unstack().fillna(0).reset_index()
return subs
@st.cache_data
def make_heatmap(results, label="generated", symbol="👤"):
# Assuming df is your wide-format DataFrame (models as rows, datasets as columns)
df_long = results.set_index("team")
team_order = results.index.tolist()
df_long = df_long.loc[:, [c for c in df_long.columns if c.startswith(label) and "accuracy" not in c]]
df_long.columns = [c.replace(f"{label}_", "") for c in df_long.columns]
if "none" in df_long.columns:
df_long = df_long.drop(columns=["none"])
df_long = df_long.reset_index().melt(id_vars="team", var_name="source", value_name="acc")
# Base chart for rectangles
base = alt.Chart(df_long).encode(
x=alt.X("source:O", title="Source", axis=alt.Axis(orient="top", labelAngle=-60)),
y=alt.Y("team:O", title="Team", sort=team_order),
)
# Heatmap rectangles
heatmap = base.mark_rect().encode(
color=alt.Color("acc:Q", scale=alt.Scale(scheme="greens"), title=f"{label} Accuracy")
)
# Text labels
text = base.mark_text(baseline="middle", fontSize=16).encode(
text=alt.Text("acc:Q", format=".2f"),
color=alt.condition(
alt.datum.acc < 0.5, # you can tune this for readability
alt.value("black"),
alt.value("white"),
),
)
# Combine heatmap and text
chart = (heatmap + text).properties(width=600, height=500, title=f"Accuracy on {symbol} {label} sources heatmap")
return chart
@st.cache_data
def load_roc_file(task, submission_ids):
rocs = pd.read_csv(f"{results_path}/{task}_source_rocs.csv")
rocs = rocs[rocs["submission_id"].isin(submission_ids)]
return rocs
@st.cache_data
def get_unique_teams(teams):
return teams.unique().tolist()
@st.cache_data
def filter_teams(temp, selected_team):
mask = temp.loc[:, "team"].isin(selected_team)
return temp.loc[mask]
def make_roc_curves(task, submission_ids):
rocs = load_roc_file(task, submission_ids)
# if rocs["team"].nunique() > 1:
color_field = "team:N"
roc_chart = (
alt.Chart(rocs)
.mark_line()
.encode(
x="fpr", y="tpr", color=alt.Color(color_field, scale=alt.Scale(scheme=color_map)), detail="submission_id:N"
)
)
return roc_chart
#####################################################################
## Page definition ##
#####################################################################
## Set title
st.set_page_config(
page_title="Leaderboard",
initial_sidebar_state="collapsed",
layout="wide", # This makes the app use the full width of the screen
)
## Pull new results or toggle private public if you are an owner
with st.sidebar:
color_map = st.selectbox("colormap", ["paired", "category20", "category20b", "category20c", "set2", "set3"])
st.session_state["colormap"] = color_map
hf_token = os.getenv("HF_TOKEN")
st.session_state["hf_token"] = hf_token
password = st.text_input("Admin login:", type="password")
dataset_options = ["public"]
if password == hf_token:
dataset_options = ["public", "private", "private_only"]
if st.button("Pull New Results"):
with st.spinner("Pulling new results", show_time=True):
try:
process = subprocess.Popen(
["python3", "utils.py"],
text=True, # Decode stdout/stderr as text
)
st.info(f"Background task started with PID: {process.pid}")
process.wait()
process.kill()
if process.returncode != 0:
st.error("The process did not finish successfully.")
else:
st.success(f"PID {process.pid} finished!")
# If a user has the right perms, then this clears the cache
load_results.clear()
get_volume.clear()
load_submission.clear()
st.rerun()
except Exception as e:
st.error(f"Error starting background task: {e}")
## Initialize the dataset view state in session_state if it doesn't exist
if "dataset_view" not in st.session_state:
st.session_state.dataset_view = "public"
# Create the selectbox, ensuring the index is valid
current_view = st.session_state.dataset_view
valid_index = dataset_options.index(current_view) if current_view in dataset_options else 0
dataset_view = st.selectbox("Dataset View", options=dataset_options, index=valid_index, key="dataset_view")
# Display the current dataset view
if dataset_view == "private":
st.success("Showing **PRIVATE** scores (all data).")
# Visual indicator for admins in the UI
if password == hf_token:
st.info("🔐 Admin View: You have access to all data")
# Initialize the top_n parameter if not in session_state
if "top_n_value" not in st.session_state:
st.session_state.top_n_value = 3
# Add a slider to select the number of top elements to average
top_n_value = st.slider(
"Mean of top N elements",
min_value=2,
max_value=10,
value=st.session_state.top_n_value,
step=1,
help="Calculate the mean of the top N elements in each column",
key="top_n_value",
)
st.session_state["top_n"] = top_n_value
elif dataset_view == "private_only":
st.success("Showing **PRIVATE ONLY** scores (excluding public data).")
# Visual indicator for admins in the UI
if password == hf_token:
st.info("🔒 Admin View: You have access to private-only data")
# Initialize the top_n parameter if not in session_state
if "top_n_value" not in st.session_state:
st.session_state.top_n_value = 3
# Add a slider to select the number of top elements to average
top_n_value = st.slider(
"Mean of top N elements",
min_value=2,
max_value=10,
value=st.session_state.top_n_value,
step=1,
help="Calculate the mean of the top N elements in each column",
key="top_n_value",
)
st.session_state["top_n"] = top_n_value
else:
st.info("Showing **PUBLIC** scores.")
st.session_state["top_n"] = None
# Ensure only admin users can access private data
if dataset_view in ["private", "private_only"] and password == hf_token:
split = dataset_view
# Clear the cache when the dataset view changes
previous_view = st.session_state.get("previous_dataset_view")
if previous_view != dataset_view:
load_results.clear()
st.session_state["previous_dataset_view"] = dataset_view
else:
split = "public"
else:
split = "public"
st.session_state["split"] = split
def show_dataframe_w_format(df, format="compact", top_n=None):
"""
Display a dataframe with formatted columns. If in private mode and top_n is provided,
adds a row showing the mean of the top n values for each column.
Args:
df: Pandas dataframe to display
format: Format string for number columns (default: "compact")
top_n: Optional number of top values to average per column
"""
split = st.session_state.get("split", "public")
# Only add top-n mean row in private mode
if split in ["private", "private_only"] and top_n is not None and isinstance(top_n, int) and top_n > 0:
# Create a copy to avoid modifying the original
df_display = df.copy()
# Calculate the mean of top n values for each column
top_n_means = {}
for col in df.columns:
sorted_values = df[col] # .sort_values(ascending=False)
# Ensure we don't try to take more values than available
actual_n = min(top_n, len(sorted_values))
if actual_n > 0:
top_n_means[col] = sorted_values.iloc[:actual_n].mean()
else:
top_n_means[col] = float("nan")
# Add the mean row as a new row in the dataframe
top_n_means_df = pd.DataFrame([top_n_means], index=[f"Top-{top_n} Mean"])
df_display = pd.concat([top_n_means_df, df_display])
else:
df_display = df
column_config = {c: st.column_config.NumberColumn(c, format=format) for c in df_display.columns}
return st.dataframe(df_display, column_config=column_config)
@st.fragment
def show_leaderboard(task, score: str = "source"):
split = st.session_state.get("split", "public")
results = load_results(task, best_only=True)
source_split_map = {}
if split in ["private", "private_only"]:
_sol_df = pd.read_csv(COMP_CACHE / task / "solution-processed.csv")
pairs_df = _sol_df[["source_og", "split"]].drop_duplicates()
source_split_map = {x: y for x, y in zip(pairs_df["source_og"], pairs_df["split"])}
cols = [
"balanced_accuracy",
"generated_accuracy",
"real_accuracy",
# "pristine_accuracy",
"auc",
"total_time",
"datetime",
"fail_rate",
]
results_for_split_score = results[f"{split}_{score}_score"]
all_teams = get_unique_teams(results_for_split_score.index.to_series())
default = [t for t in all_teams if "test" not in t.lower()]
teams = st.multiselect("Teams", options=all_teams, default=default,key=f"ms_lead_{task}")
results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)]
column_config = {
"balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Balanced Accruacy",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"generated_accuracy": st.column_config.NumberColumn(
"👤 True Postive Rate",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"real_accuracy": st.column_config.NumberColumn(
"🧑‍🎤 True Negative Rate",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"auc": st.column_config.NumberColumn(
"📐 AUC",
format="compact",
min_value=0,
# pinned=True,
max_value=1.0,
# width="small",
),
"total_time": st.column_config.NumberColumn(
"🕒 Inference Time (s)",
format="compact",
# pinned=True,
# width="small",
),
"datetime": st.column_config.DatetimeColumn(
"🗓️ Submission Date",
format="YYYY-MM-DD",
# width="small",
),
"fail_rate": st.column_config.NumberColumn(
"❌ Fail Rate",
format="compact",
# width="small",
),
}
labels = {"real": "🧑‍🎤", "generated": "👤"}
for c in results_for_split_score.columns:
if "accuracy" in c:
continue
if any(p in c for p in ["generated", "real"]):
s = c.split("_")
pred = s[0]
source = " ".join(s[1:])
column_config[c] = st.column_config.NumberColumn(
labels[pred] + " " + source,
help=c,
format="compact",
min_value=0,
max_value=1.0,
)
"#### Summary"
st.dataframe(results_for_split_score.loc[:, cols], column_config=column_config)
f"##### Accuracy Breakdown by Source"
accuracy_types = {
"True positive/negative rate": 0,
"Conditional balanced accuracy": 1,
"AUC": 2,
}
granularity = st.radio(
"accuracy type",
list(accuracy_types.keys()),
key=f"granularity-{task}-{score}",
horizontal=True,
label_visibility="collapsed",
index=0,
)
## Subset the dataset
cols = [
c
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
col_names = [
(
f"📢 {c.replace('generated_', '')}"
if source_split_map.get(c.replace("generated_", ""), "public") == "public"
else f"🔐 {c.replace('generated_', '')}"
)
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
gen_tmp = results_for_split_score.loc[:, cols].copy()
gen_tmp.columns = col_names
cols = [
c
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
col_names = [
(
f"📢 {c.replace('real_', '')}"
if source_split_map.get(c.replace("real_", ""), "public") == "public"
else f"🔐 {c.replace('real_', '')}"
)
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
real_tmp = results_for_split_score.loc[:, cols].copy()
real_tmp.columns = col_names
## Check cases
if accuracy_types[granularity] == 0:
"#### 👤 True Positive Rate | Generated Source"
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n)
"#### 🧑‍🎤 True Negative Rate | Real Source"
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n)
elif accuracy_types[granularity] == 1:
"#### 👤 Balanced Accuracy | Generated Source"
tnr = results_for_split_score.loc[:, ["real_accuracy"]]
gen_tmp[:] = (gen_tmp.values + tnr.values) / 2.0
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n)
"#### 🧑‍🎤 Balanced Accuracy | Real Source"
tpr = results_for_split_score.loc[:, ["generated_accuracy"]]
real_tmp[:] = (real_tmp.values + tpr.values) / 2.0
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n)
else:
cols = [c for c in results_for_split_score.columns if "generated_conditional_auc" in c]
col_names = [
(
f"📢 {c.replace('generated_conditional_auc_', '')}"
if source_split_map.get(c.replace("generated_conditional_auc_", ""), "public") == "public"
else f"🔐 {c.replace('generated_conditional_auc_', '')}"
)
for c in results_for_split_score.columns
if "generated_conditional_auc_" in c
]
gen_tmp = results_for_split_score.loc[:, cols].copy()
gen_tmp.columns = col_names
cols = [c for c in results_for_split_score.columns if "real_conditional_auc" in c]
col_names = [
(
f"📢 {c.replace('real_conditional_auc_', '')}"
if source_split_map.get(c.replace("real_conditional_auc_", ""), "public") == "public"
else f"🔐 {c.replace('real_conditional_auc_', '')}"
)
for c in results_for_split_score.columns
if "real_conditional_auc" in c
]
real_tmp = results_for_split_score.loc[:, cols].copy()
real_tmp.columns = col_names
"#### 👤 Conditional AUC | Generated Source"
# st.dataframe(gen_tmp, column_config=column_config)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(gen_tmp, top_n=top_n)
"#### 🧑‍🎤 Conditional AUC | Real Source"
# st.dataframe(real_tmp, column_config=column_config)
show_dataframe_w_format(real_tmp, top_n=top_n)
def make_roc(results, show_text=False):
results["FA"] = 1.0 - results["real_accuracy"]
chart = (
alt.Chart(results)
.mark_point(filled=True)
.encode(
x=alt.X("FA:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0.0, 1.0])),
y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0.0, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
size=alt.Size(
"total_time:Q", title="🕒 Inference Time", scale=alt.Scale(rangeMin=100)
), # Size by quantitative field
shape=alt.Shape("split:N", title="Split"),
detail=["submission_id", "auc", "balanced_accuracy"],
)
.properties(width=400, height=400, title="Detection vs False Alarm vs Inference Time")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
fontSize=14,
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
)
.encode(
x=alt.X("FA:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0, 1])),
y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0, 1])),
color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(tpr=[0, 1], fpr=[0, 1])))
.mark_line(color="lightgray", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
diag_line2 = (
alt.Chart(pd.DataFrame(dict(tpr=[1, 0], fpr=[0, 1])))
.mark_line(color="lightblue", strokeDash=[8, 4], size=1)
.encode(x="fpr", y="tpr")
)
return chart + diag_line + diag_line2
def make_acc(results, show_text=False, metric_spec=("balanced_accuracy", "Balanced Accuracy")):
metric, metric_title = metric_spec
results = results.loc[results["total_time"] >= 0]
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("total_time:Q", title="🕒 Inference Time (sec)", scale=alt.Scale(type="log", domain=[100, 100000])),
y=alt.Y(
f"{metric}:Q",
title=metric_title,
scale=alt.Scale(domain=[0.4, 1]),
),
shape=alt.Shape("split:N", title="Split"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title=f"Inference Time vs {metric_title}")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X(
"total_time:Q", title="🕒 Inference Time (sec)", scale=alt.Scale(type="log", domain=[100, 100000])
),
y=alt.Y(
f"{metric}:Q",
title=metric_title,
scale=alt.Scale(domain=[0.4, 1]),
),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(t=[100, 100000], y=[0.5, 0.5])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="t", y="y")
)
return chart + diag_line
def make_acc_vs_auc(results, show_text=False, flip=False):
# results = results.loc[results["total_time"] >= 0]
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(
"balanced_accuracy:Q",
title="Balanced Accuracy",
scale=alt.Scale(domain=[0.4, 1]),
),
shape=alt.Shape("split:N", title="Split"),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title="AUC vs Balanced Accuracy")
)
if flip:
chart = chart.encode(x=chart.encoding.y, y=chart.encoding.x)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(
"balanced_accuracy:Q",
title="Balanced Accuracy",
scale=alt.Scale(domain=[0.4, 1]),
),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
if flip:
text = text.encode(x=text.encoding.y, y=text.encoding.x)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
if flip:
diag_line = diag_line.encode(x=diag_line.encoding.y, y=diag_line.encoding.x)
full_chart = chart + diag_line
return full_chart
def make_vs_public(results, show_text=False, other_split=None):
# results = results.loc[results["total_time"] >= 0]
# results.groupby()
chart = (
alt.Chart(results)
.mark_point(size=200, filled=True)
.encode(
x=alt.X("public:Q", title="public", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(f"{other_split}:Q", title=f"{other_split}", scale=alt.Scale(domain=[0.4, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=400, height=400, title=f"public vs {other_split}")
)
if show_text:
text = (
alt.Chart(results)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("public:Q", title="public", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y(f"{other_split}:Q", title=f"{other_split}", scale=alt.Scale(domain=[0.4, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
full_chart = chart + diag_line
return full_chart
def get_heatmaps(temp):
h1 = make_heatmap(temp, "generated", symbol="👤")
h2 = make_heatmap(temp, "real", symbol="🧑‍🎤")
st.altair_chart(h1, use_container_width=True)
st.altair_chart(h2, use_container_width=True)
if temp.columns.str.contains("aug", case=False).any():
h3 = make_heatmap(temp, "aug", symbol="🛠️")
st.altair_chart(h3, use_container_width=True)
@st.fragment
def show_augmentations(task, score):
split = st.session_state.get("split", "public")
results = load_results(task, best_only=True)
results_for_split_score = results[f"{split}_{score}_score"]
all_teams = get_unique_teams(results_for_split_score.index.to_series())
teams = st.multiselect("Teams", options=all_teams, default=[t for t in all_teams if "test" not in t.lower()],key=f"ms_aug_{task}")
results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)]
f"##### Accuracy Breakdown by Category"
accuracy_types = {
"Accuracy": 0,
"AUC": 1,
}
# Create a row with two columns for controls
col1, col2 = st.columns([0.1, 0.9])
with col1:
granularity = st.radio(
"accuracy type",
list(accuracy_types.keys()),
key=f"granularity-{task}-{score}",
horizontal=True,
label_visibility="collapsed",
index=0,
)
show_deltas = False
if split in ["private", "private_only"]:
with col2:
# Add toggle for showing deltas from "none" column
show_deltas = st.toggle(
"Show deltas from 'none' (higher values mean 'none' was **lower**)",
value=False,
key=f"deltas-{task}-{score}",
)
## Check cases
if accuracy_types[granularity] == 0:
"#### Balanced Accuracy"
gen_cols = [
c
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
gen_tmp = results_for_split_score.loc[:, gen_cols].copy()
gen_tmp.columns = [
c.replace("generated_", "")
for c in results_for_split_score.columns
if "generated_" in c and "accuracy" not in c and "conditional" not in c
]
real_cols = [
c
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
real_tmp = results_for_split_score.loc[:, real_cols].copy()
real_tmp.columns = [
c.replace("real_", "")
for c in results_for_split_score.columns
if "real_" in c and "accuracy" not in c and "conditional" not in c
]
tmp = (gen_tmp + real_tmp) / 2.0
# If toggle is on and "none" column exists, calculate deltas from "none" column
if show_deltas and "none" in tmp.columns:
# Get the "none" column values
none_values = tmp["none"].copy()
# Calculate deltas: none - current_column
for col in tmp.columns:
if col != "none":
tmp[col] = -none_values + tmp[col]
# st.dataframe(tmp)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(tmp, top_n=top_n)
else:
cols = [c for c in results_for_split_score.columns if "conditional_auc" in c]
col_names = [
c.replace("conditional_auc_", "")
for c in results_for_split_score.columns
if "conditional_auc" in c
]
tmp = results_for_split_score.loc[:, cols].copy()
tmp.columns = col_names
"#### Conditional AUC"
# If toggle is on and "none" column exists, calculate deltas from "none" column
if show_deltas and "none" in tmp.columns:
# Get the "none" column values
none_values = tmp["none"].copy()
# Calculate deltas: none - current_column
for col in tmp.columns:
if col != "none":
tmp[col] = -none_values + tmp[col]
# st.dataframe(tmp)
top_n = st.session_state.get("top_n", None)
show_dataframe_w_format(tmp, top_n=top_n)
@st.fragment
def show_charts(task, score="source"):
show_auc = st.toggle("Show Best w.r.t. AUC", value=False, key=f"toggle auc {task}")
metric = "auc" if show_auc else "balanced_accuracy"
split = st.session_state.get("split", "public")
hf_token = st.session_state.get("hf_token", None)
results = load_results(task, best_only=True, metric=metric)
temp = results[f"{split}_source_score"].reset_index()
temp_public = results[f"public_source_score"].reset_index()
temp["split"] = split
temp_public["split"] = "public"
teams = get_unique_teams(temp["team"])
default = [t for t in teams if "test" not in t.lower()]
best_only = True
compare = False
if split != "public":
b1, b2 = st.columns([0.2, 0.8])
with b1:
best_only = st.toggle("Best Only", value=True, key=f"best only {task} {score} {split}")
full_curves = st.toggle("Full curve", value=True, key=f"all curves {task}")
# compare = st.toggle(f"Compare vs Public",value=False, key=f"compare {task}")
if not best_only:
results = load_results(task, best_only=best_only, metric=metric)
temp = results[f"{split}_source_score"].reset_index()
temp_public = results["public_source_score"].reset_index()
# selected_team = st.pills(
# "Team", ["ALL"] + teams, key=f"teams {task} 1", default=["ALL"], selection_mode="multi"
# )
with b2:
# selected_team = st.pills(
# "Team", ["ALL"] + teams, key=f"teams {task} 2", default=default, selection_mode="multi"
# )
default = [t for t in teams if "test" not in t.lower()]
selected_team = st.multiselect("Teams", options=teams, default=default,key=f"charts_{task}")
if selected_team is None or len(selected_team) == 0:
return
if "ALL" in selected_team:
selected_team = ["ALL"]
if "ALL" not in selected_team:
temp = filter_teams(temp, selected_team)
temp_public = filter_teams(temp_public, selected_team)
# with st.spinner("making plots...", show_time=True):
if compare:
temp["split"] = split
temp_public["split"] = "public"
temp = pd.concat([temp, temp_public], ignore_index=True)
metric = "balanced_accuracy" if not show_auc else "auc"
temp_vs_public = temp.set_index(["team", "submission_id", "split"])[metric].unstack().reset_index()
# st.write(temp_vs_public)
public_vs_private = make_vs_public(temp_vs_public, show_text=best_only, other_split=split)
# st.write(temp)
roc_scatter = make_roc(temp, show_text=best_only & (not compare))
acc_vs_time = make_acc(
temp,
show_text=best_only & (not compare),
metric_spec=("auc", "Area Under Curve") if show_auc else ("balanced_accuracy", "Balanced Accuracy"),
)
acc_vs_auc = make_acc_vs_auc(temp, show_text=best_only & (not compare), flip=show_auc)
if split == "private" and hf_token is not None:
if full_curves:
roc_scatter = make_roc_curves(task, temp["submission_id"].values.tolist()) + roc_scatter
st.altair_chart(roc_scatter | acc_vs_time | acc_vs_auc, use_container_width=False)
if compare:
st.altair_chart(public_vs_private, use_container_width=False)
st.info(f"loading {temp['submission_id'].nunique()} submissions")
@st.cache_data
def compute_running_max(result_df, teams, metric):
# Group by team and sort by datetime
result_df = result_df.copy()
result_df = result_df.loc[result_df["team"].isin(teams)]
result_df["datetime"] = pd.to_datetime(result_df["datetime"])
return (
result_df.groupby("team")
.apply(lambda a: a.sort_values("datetime").set_index("datetime")[metric].cummax())
.reset_index()
)
@st.fragment
def show_timeline(task, score="source"):
split = st.session_state.get("split", "public")
hf_token = st.session_state.get("hf_token", None)
results = load_results(task, best_only=False)
temp = results[f"{split}_source_score"].reset_index()
all_teams = get_unique_teams(temp["team"])
all_teams = list(filter(lambda a: a!="Baseline",all_teams))
default = [t for t in all_teams if ("test" not in t.lower())]
teams = st.multiselect("Teams", options=all_teams, default=default)
metric = st.selectbox("Metric", ["auc", "balanced_accuracy"], key=f"time {task}")
baseline_val = temp.query("team=='Baseline'")[metric].max()
df = compute_running_max(temp, teams, metric).dropna()
# team_best = df.groupby("team")[metric].max().sort_values(ascending = False)
team_best = df.sort_values([metric,"datetime"],ascending = False).drop_duplicates(["team"])
team_order = team_best["team"].tolist() + ["Baseline"]
random_guess = (
alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], metric: [0.5, 0.5]}))
.mark_line(strokeDash=[4, 4], color="grey", strokeWidth=2)
.encode(
x="datetime:T",
y=f"{metric}:Q",
)
)
# st.write(st.session_state)
baseline_chart = (
alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], "team": "Baseline", metric: [baseline_val,baseline_val]}))
.mark_line(strokeDash=[8, 8], color="darkgray", strokeWidth=2)
.encode(
x="datetime:T",
y=f"{metric}:Q",
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),sort=team_order),
)
)
# Create main chart
task_chart = (
alt.Chart(df)
.mark_line(point=True, interpolate='step-after')
.encode(
x=alt.X(
"datetime:T",
title="Submission Date",
),
y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),
sort=team_order),
)
.properties(width=800, height=500, title="Best Performance Over Time (Original Content)")
.interactive()
)
if st.checkbox("Show Labels",value=True,key = f"{task} check show timeline"):
team_best.loc[len(team_best)] = {"team":"Baseline", metric:baseline_val, "datetime": df["datetime"].max()}
# st.write(team_best)
text_chart = (
alt.Chart(team_best)
.mark_text(
align="left",
fontSize=14,
dx=5, # shift text to right of point
dy=-5, # shift text slightly up
)
.encode(
x=alt.X(
"datetime:T",
title="Submission Date",
scale = alt.Scale(domain=[df["datetime"].min(),
df["datetime"].max() + datetime.timedelta(days = 4)]),
),
y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])),
color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),
sort=team_order),
text="team",
)
)
# Combine charts and display
st.altair_chart((task_chart +baseline_chart+text_chart).configure_legend(disable=True), use_container_width=True)
# st.altair_chart(task_chart, use_container_width=True)
def make_plots_for_task(task):
if len(TASKS.get(task)) > 1:
t1, t2, t3, t4 = st.tabs(["Tables", "Charts", "Timeline", "Augmentations"])
else:
t1, t2, t3 = st.tabs(["Tables", "Charts", "Timeline"])
t4 = None
with t1:
show_leaderboard(task)
with t2:
show_charts(task, score="source")
with t3:
split = st.session_state.get("split", "public")
if split != "public":
show_timeline(task, score="source")
else:
st.info(f"not available in {split} in mode")
if t4 is not None:
with t4:
show_augmentations(task, score="category")
updated = get_updated_time()
st.markdown(updated)
@st.fragment
def show_task_comparison():
"""Show summary tables for Task 1 and Task 2 side by side."""
split = st.session_state.get("split", "public")
color_map_choice = st.session_state.get("colormap", "paired")
task1_key = list(TASKS.keys())[1] # video-challenge-task-1-config
task2_key = list(TASKS.keys())[2] # video-challenge-task-2-config
task1_results = load_results(task1_key, best_only=True)
task2_results = load_results(task2_key, best_only=True)
cols = ["balanced_accuracy", "generated_accuracy", "real_accuracy", "auc", "total_time", "datetime", "fail_rate"]
column_config = {
"balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"generated_accuracy": st.column_config.NumberColumn(
"👤 True Positive Rate",
format="compact",
min_value=0,
max_value=1.0,
),
"real_accuracy": st.column_config.NumberColumn(
"🧑‍🎤 True Negative Rate",
format="compact",
min_value=0,
max_value=1.0,
),
"auc": st.column_config.NumberColumn(
"📐 AUC",
format="compact",
min_value=0,
max_value=1.0,
),
"total_time": st.column_config.NumberColumn(
"🕒 Inference Time (s)",
format="compact",
),
"datetime": st.column_config.DatetimeColumn(
"🗓️ Submission Date",
format="YYYY-MM-DD",
),
"fail_rate": st.column_config.NumberColumn(
"❌ Fail Rate",
format="compact",
),
"task1_balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Task 1 Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"task2_balanced_accuracy": st.column_config.NumberColumn(
"⚖️ Task 2 Balanced Accuracy",
format="compact",
min_value=0,
max_value=1.0,
),
"difference": st.column_config.NumberColumn(
"⚖️ Difference (T1-T2)",
format="compact",
),
"percent_change": st.column_config.NumberColumn(
"% Change",
format="+.2%",
),
}
# Create tabs for different views
tables_tab, charts_tab = st.tabs(["Tables", "Charts"])
with tables_tab:
# Create two columns for side-by-side tables
st.subheader("Performance Comparison: Task 1 vs Task 2")
col1, col2 = st.columns(2)
with col1:
st.subheader("Task 1: Original Content")
st.dataframe(
task1_results[f"{split}_source_score"].loc[:, cols],
column_config=column_config,
use_container_width=True,
)
with col2:
st.subheader("Task 2: Post-processed Content")
st.dataframe(
task2_results[f"{split}_source_score"].loc[:, cols],
column_config=column_config,
use_container_width=True,
)
# Add a section for comparison of task performance differences
st.subheader("Performance Analysis")
st.markdown(
"""
Performance comparison between Task 1 (original content) and
Task 2 (post-processed content). A positive difference indicates degraded performance
on post-processed content.
"""
)
# Get the datasets for both tasks
task1_df = task1_results[f"{split}_source_score"].reset_index()
task2_df = task2_results[f"{split}_source_score"].reset_index()
# Create a combined dataframe for analysis
common_teams = set(task1_df["team"]) & set(task2_df["team"])
if common_teams:
# Filter to teams that appear in both tasks
task1_filtered = task1_df[task1_df["team"].isin(common_teams)]
task2_filtered = task2_df[task2_df["team"].isin(common_teams)]
# Create a comparison dataframe
comparison_df = pd.DataFrame(
{
"team": list(common_teams),
"task1_balanced_accuracy": [
task1_filtered[task1_filtered["team"] == team]["balanced_accuracy"].values[0]
for team in common_teams
],
"task2_balanced_accuracy": [
task2_filtered[task2_filtered["team"] == team]["balanced_accuracy"].values[0]
for team in common_teams
],
}
)
# Calculate differences and percentage changes
comparison_df["difference"] = (
comparison_df["task1_balanced_accuracy"] - comparison_df["task2_balanced_accuracy"]
)
comparison_df["percent_change"] = comparison_df["difference"] / comparison_df["task1_balanced_accuracy"]
# Sort by the absolute difference (to show biggest performance changes first)
comparison_df = (
comparison_df.sort_values(by="difference", ascending=False).reset_index(drop=True).set_index("team")
)
# Display the comparison table
show_dataframe_w_format(comparison_df, top_n=0)
else:
st.warning("No common teams found across both tasks.")
with charts_tab:
st.subheader("Team Performance Across Tasks")
metric = st.selectbox("Metric", ["balanced_accuracy", "auc"])
# Get the datasets for both tasks if not already done
if "task1_df" not in locals():
task1_df = task1_results[f"{split}_source_score"].reset_index()
task2_df = task2_results[f"{split}_source_score"].reset_index()
common_teams = set(task1_df["team"]) & set(task2_df["team"])
if common_teams:
# Prepare data for the plot
plot_data = []
for team in common_teams:
# Get team's balanced accuracy for each task
task1_acc = task1_df[task1_df["team"] == team][metric].values[0]
task2_acc = task2_df[task2_df["team"] == team][metric].values[0]
# Add points for Task 1
plot_data.append({"team": team, "task": "Task 1", metric: task1_acc})
# Add points for Task 2
plot_data.append({"team": team, "task": "Task 2", metric: task2_acc})
plot_df = pd.DataFrame(plot_data).set_index(["team", "task"])[metric].unstack().reset_index()
# st.write(plot_df)
chart = (
alt.Chart(plot_df)
.mark_circle(size=200)
.encode(
x=alt.X("Task 1:Q", title=f"Task 1 {metric}", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y("Task 2:Q", title=f"Task 2 {metric}", scale=alt.Scale(domain=[0.4, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
)
.properties(width=600, height=600, title="AUC vs Balanced Accuracy")
)
# if show_text:
text = (
alt.Chart(plot_df)
.mark_text(
align="right",
dx=-5, # shift text to right of point
dy=-5, # shift text slightly up
fontSize=14,
)
.encode(
x=alt.X("Task 1:Q", title=f"Task 1 {metric}", scale=alt.Scale(domain=[0.4, 1])),
y=alt.Y("Task 2:Q", title=f"Task 2 {metric}", scale=alt.Scale(domain=[0.4, 1])),
color=alt.Color(
"team:N", scale=alt.Scale(scheme=color_map)
), # Color by categorical field # Size by quantitative field
text="team",
)
)
chart = chart + text
diag_line = (
alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0])))
.mark_line(color="lightgray", strokeDash=[8, 4])
.encode(x="x", y="y")
)
st.altair_chart(chart + diag_line, use_container_width=False)
# Create line chart connecting team performances
# lines = (
# alt.Chart(plot_df)
# .mark_line(point=alt.OverlayMarkDef(filled=True, size=100), strokeDash=[4, 2], strokeWidth=2)
# .encode(
# x=alt.X("task:N", title="Task", sort=["Task 1", "Task 2"]),
# y=alt.Y("balanced_accuracy:Q", title="Balanced Accuracy", scale=alt.Scale(domain=[0.4, 1.0])),
# color=alt.Color(
# "team:N", scale=alt.Scale(scheme=color_map_choice), legend=alt.Legend(title="Teams")
# ),
# tooltip=["team:N", "task:N", "balanced_accuracy:Q"],
# )
# .properties(width=700, height=500, title="Performance Changes Across Tasks")
# )
# st.altair_chart(lines, use_container_width=False)
else:
st.warning("No common teams found across both tasks.")
t1, t2, tp, comparison_tab, volume_tab, all_submission_tab, san_check = st.tabs(
["**Task 1**", "**Task 2**", "**Pilot Task**", "**Compare Tasks**", "**Submission Volume**", "**All Submissions**","**Sanity Check**"]
)
with t1:
"*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*"
make_plots_for_task(list(TASKS.keys())[1])
with t2:
"*Detection of Post-processed Synthetic Video Content. A subset of Task 1 data files are modified with standard post-processing techniques (compression, resizing, etc).*"
make_plots_for_task(list(TASKS.keys())[2])
with tp:
"*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*"
make_plots_for_task(list(TASKS.keys())[0])
if split in ["private", "private_only"]:
with comparison_tab:
"**Task 1 to Task 2 performance comparison.**"
show_task_comparison()
with volume_tab:
subs = get_volume()
status_lookup = "QUEUED,PROCESSING,SUCCESS,FAILED".split(",")
found_columns = subs.columns.values.tolist()
status_lookup = list(set(status_lookup) & set(found_columns))
st.bar_chart(subs, x="date", y=status_lookup, stack=True)
total_submissions = int(subs.loc[:, status_lookup].fillna(0).values.sum())
st.metric("Total Submissions", value=total_submissions)
st.metric("Duration", f'{(subs["date"].max() - subs["date"].min()).days} days')
@st.fragment
def show_all_submissions():
show_all = st.toggle("Show All Columns", value=False)
data = load_submission()
fields = ["task", "team", "status_reason"]
field_values = {f: data[f].unique().tolist() for f in fields}
selected_fields = {}
for f, v in field_values.items():
selected_fields[f] = st.multiselect(f"Select {f} to Display", v, default=v)
mask = np.ones(len(data)).astype(bool)
for fs, vs in selected_fields.items():
mask &= data[fs].isin(vs)
data = data.loc[mask]
search_str = st.text_input("search", value="")
if search_str != "":
mask_search = (
data.select_dtypes(include=["object"])
.apply(lambda x: x.str.contains(search_str, case=False, na=False))
.any(axis=1)
)
data = data.loc[mask_search]
if not show_all:
columns_to_show = "task,team,datetime,status_reason,submission_repo,submission_id,space_id".split(",")
data = data.loc[:, columns_to_show]
data = data.sort_values("datetime", ascending=False)
# st.write(",".join(data.columns))
st.dataframe(data, hide_index=True)
@st.fragment
def show_san_check():
for task in list(TASKS.keys()):
f"## {task}"
out = load_results(task,best_only=True, metric="balanced_accuracy",check_discrepancies=True)
for k,v in out.items():
if k.startswith("desc"):
f"### {k}"
st.write(v)
if split == "private":
with all_submission_tab:
show_all_submissions()
with san_check:
show_san_check()