from matplotlib.colors import ListedColormap import streamlit as st from pathlib import Path import pandas as pd import altair as alt import subprocess import os import numpy as np import datetime import colorcet as cc ## Save results path COMP_CACHE = Path("competition_cache/safe-challenge") results_path = Path("competition_cache/cached_results") TASKS = { "video-challenge-pilot-config": ["source"], "video-challenge-task-1-config": ["source"], "video-challenge-task-2-config": ["source", "category"], } if os.environ.get("WORKSHOP_MODE",None): WORKSHOP_TEAMS=["GRIP-UNINA","ISPL-Realynx","DASH","Lemma1727","Truebees","Shahidmuneer","Baseline","DX"] def get_default_teams(teams): try: return [t for t in WORKSHOP_TEAMS if t in teams] except: return [t for t in teams if "test" not in t.lower()] valid_splits = ["public", "private", "private_only"] ##################################################################### ## Data loading ## ##################################################################### ## Data loading def get_max_score(group: pd.DataFrame, metric: str, use_selection: bool = True) -> pd.DataFrame: if use_selection: if group["selected"].any(): subset = group[group["selected"]] else: subset = group else: subset = group max_idx = subset[metric].idxmax() return group.loc[max_idx] @st.cache_data def get_cmap(name): ccmap = cc.cm[name] mpl_cmap = ListedColormap(ccmap(np.linspace(0, 1, 256)), name='fire') return mpl_cmap def select_rows(df, metric: str = "balanced_accuracy"): def select(group): if group["selected"].any(): return group[group["selected"]].loc[group[group["selected"]][metric].idxmax()] else: return group.loc[group[f"{metric}_public"].idxmax()] return df.groupby("team", group_keys=False).apply(select) @st.cache_data def load_results(task_key, best_only, metric="balanced_accuracy",check_discrepancies = False): to_return = {} for split in valid_splits: for score in TASKS.get(task_key): file_path = f"{results_path}/{task_key}_{score}_{split}_score.csv" if os.path.exists(file_path): df = pd.read_csv(file_path) public_df = pd.read_csv(f"{results_path}/{task_key}_{score}_public_score.csv") if not best_only: to_return[f"{split}_{score}_score"] = df else: if split == "public": df = df.sort_values(["team", metric], ascending=False).reset_index(drop=True) selected_max = ( df.copy() .groupby("team", group_keys=False) .apply(get_max_score, metric=metric, use_selection=True) .sort_values([metric], ascending=False) .set_index("team") ) df = ( df.copy() .groupby("team", group_keys=False) .apply(get_max_score, metric=metric, use_selection=False) .sort_values([metric], ascending=False) .set_index("team") ) if check_discrepancies: to_return[f"desc_{split}_{score}_score"] = df[metric] - selected_max[metric] else: public_df = ( public_df.sort_values(["team", metric], ascending=False) .reset_index(drop=True) .set_index("submission_id")[metric] ) tmp = df.set_index("submission_id").copy() tmp = tmp.join(public_df, on=["submission_id"], rsuffix="_public") tmp = tmp.reset_index() df = select_rows(tmp,metric = metric) df = df.sort_values([metric], ascending=False).set_index("team") to_return[f"{split}_{score}_score"] = df return to_return @st.cache_data def load_submission(): out = [] for task in TASKS: data = pd.read_csv(f"{results_path}/{task}_source_submissions.csv") data["task"] = task out.append(data) return pd.concat(out, ignore_index=True) def get_updated_time(file="competition_cache/updated.txt"): if os.path.exists(file): return open(file).read() else: return "no time file found" @st.cache_data def get_volume(): subs = pd.concat( [pd.read_csv(f"{results_path}/{task}_source_submissions.csv") for task in TASKS], ignore_index=True, ) subs["datetime"] = pd.DatetimeIndex(subs["datetime"]) subs["date"] = subs["datetime"].dt.date subs = subs.groupby(["date", "status_reason"]).size().unstack().fillna(0).reset_index() return subs @st.cache_data def make_heatmap(results, label="generated", symbol="👤"): # Assuming df is your wide-format DataFrame (models as rows, datasets as columns) df_long = results.set_index("team") team_order = results.index.tolist() df_long = df_long.loc[:, [c for c in df_long.columns if c.startswith(label) and "accuracy" not in c]] df_long.columns = [c.replace(f"{label}_", "") for c in df_long.columns] if "none" in df_long.columns: df_long = df_long.drop(columns=["none"]) df_long = df_long.reset_index().melt(id_vars="team", var_name="source", value_name="acc") # Base chart for rectangles base = alt.Chart(df_long).encode( x=alt.X("source:O", title="Source", axis=alt.Axis(orient="top", labelAngle=-60)), y=alt.Y("team:O", title="Team", sort=team_order), ) # Heatmap rectangles heatmap = base.mark_rect().encode( color=alt.Color("acc:Q", scale=alt.Scale(scheme="greens"), title=f"{label} Accuracy") ) # Text labels text = base.mark_text(baseline="middle", fontSize=16).encode( text=alt.Text("acc:Q", format=".2f"), color=alt.condition( alt.datum.acc < 0.5, # you can tune this for readability alt.value("black"), alt.value("white"), ), ) # Combine heatmap and text chart = (heatmap + text).properties(width=600, height=500, title=f"Accuracy on {symbol} {label} sources heatmap") return chart @st.cache_data def load_roc_file(task, submission_ids): rocs = pd.read_csv(f"{results_path}/{task}_source_rocs.csv") rocs = rocs[rocs["submission_id"].isin(submission_ids)] return rocs @st.cache_data def get_unique_teams(teams): return teams.unique().tolist() @st.cache_data def filter_teams(temp, selected_team): mask = temp.loc[:, "team"].isin(selected_team) return temp.loc[mask] def make_roc_curves(task, submission_ids): rocs = load_roc_file(task, submission_ids) # if rocs["team"].nunique() > 1: color_field = "team:N" roc_chart = ( alt.Chart(rocs) .mark_line() .encode( x="fpr", y="tpr", color=alt.Color(color_field, scale=alt.Scale(scheme=color_map)), detail="submission_id:N" ) ) return roc_chart ##################################################################### ## Page definition ## ##################################################################### ## Set title st.set_page_config( page_title="Leaderboard", initial_sidebar_state="collapsed", layout="wide", # This makes the app use the full width of the screen ) ## Pull new results or toggle private public if you are an owner with st.sidebar: color_map = st.selectbox("colormap", ["paired", "category20", "category20b", "category20c", "set2", "set3"]) st.session_state["colormap"] = color_map temp = list(cc.cm_n.keys()) colormap_heatmap = st.selectbox("Color Map",options=temp, index = temp.index("gouldian")) st.session_state["colormap_heatmap"] = colormap_heatmap top_n_value = st.slider( "Mean of top N elements", min_value=2, max_value=10, value=5, step=1, help="Calculate the mean of the top N elements in each column", key="top_n_value", ) st.session_state["top_n"] = top_n_value hf_token = os.getenv("HF_TOKEN") st.session_state["hf_token"] = hf_token password = st.text_input("Admin login:", type="password") dataset_options = ["public"] if password == hf_token: dataset_options = ["public", "private", "private_only"] if st.button("Pull New Results"): with st.spinner("Pulling new results", show_time=True): try: process = subprocess.Popen( ["python3", "utils.py"], text=True, # Decode stdout/stderr as text ) st.info(f"Background task started with PID: {process.pid}") process.wait() process.kill() if process.returncode != 0: st.error("The process did not finish successfully.") else: st.success(f"PID {process.pid} finished!") # If a user has the right perms, then this clears the cache load_results.clear() get_volume.clear() load_submission.clear() st.rerun() except Exception as e: st.error(f"Error starting background task: {e}") ## Initialize the dataset view state in session_state if it doesn't exist if "dataset_view" not in st.session_state: st.session_state.dataset_view = "public" # Create the selectbox, ensuring the index is valid current_view = st.session_state.dataset_view valid_index = dataset_options.index(current_view) if current_view in dataset_options else 0 dataset_view = st.selectbox("Dataset View", options=dataset_options, index=valid_index, key="dataset_view") # Display the current dataset view if dataset_view == "private": st.success("Showing **PRIVATE** scores (all data).") # Visual indicator for admins in the UI if password == hf_token: st.info("🔐 Admin View: You have access to all data") # Initialize the top_n parameter if not in session_state # if "top_n_value" not in st.session_state: # st.session_state.top_n_value = 3 # Add a slider to select the number of top elements to average elif dataset_view == "private_only": st.success("Showing **PRIVATE ONLY** scores (excluding public data).") # Visual indicator for admins in the UI if password == hf_token: st.info("🔒 Admin View: You have access to private-only data") # Initialize the top_n parameter if not in session_state # if "top_n_value" not in st.session_state: # st.session_state.top_n_value = 5 # Add a slider to select the number of top elements to average else: st.info("Showing **PUBLIC** scores.") st.session_state["top_n"] = None # Ensure only admin users can access private data if dataset_view in ["private", "private_only"] and password == hf_token: split = dataset_view # Clear the cache when the dataset view changes previous_view = st.session_state.get("previous_dataset_view") if previous_view != dataset_view: load_results.clear() st.session_state["previous_dataset_view"] = dataset_view else: split = "public" else: split = "public" st.session_state["split"] = split def show_dataframe_w_format(df, format="compact", top_n=None,colormap_table = False, sort_columns = True, sort_by_top = True, transpose = False, subset = None): """ Display a dataframe with formatted columns. If in private mode and top_n is provided, adds a row showing the mean of the top n values for each column. Args: df: Pandas dataframe to display format: Format string for number columns (default: "compact") top_n: Optional number of top values to average per column """ split = st.session_state.get("split", "public") # Only add top-n mean row in private mode if split in ["private", "private_only"] and top_n is not None and isinstance(top_n, int) and top_n > 0: # Create a copy to avoid modifying the original df_display = df.copy() # Calculate the mean of top n values for each column top_n_means = {} for col in df.columns: sorted_values = df[col] # .sort_values(ascending=False) # Ensure we don't try to take more values than available actual_n = min(top_n, len(sorted_values)) if actual_n > 0: top_n_means[col] = sorted_values.iloc[:actual_n].mean() else: top_n_means[col] = float("nan") # Add the mean row as a new row in the dataframe top_n_means_df = pd.DataFrame([top_n_means], index=[f"Top-{top_n} Mean"]) df_display = pd.concat([top_n_means_df, df_display]) else: df_display = df sort_by_top = sort_by_top and (top_n is not None) if sort_columns: df_display = df_display.sort_index(axis=1) if sort_by_top: sorted_top_n = df_display.iloc[0].sort_values(ascending=False) df_display = df_display.loc[:,sorted_top_n.index] if sort_columns and sort_by_top: public = [c for c in df_display.columns if c.startswith("📢")] private = [c for c in df_display.columns if not c.startswith("📢")] sorted_top_n_pub = df_display.iloc[0].loc[public].sort_values(ascending=False) sorted_top_n_pri = df_display.iloc[0].loc[private].sort_values(ascending=False) df_display = df_display.loc[:,sorted_top_n_pub.index.tolist() + sorted_top_n_pri.index.tolist()] if transpose: df_display = df_display.transpose() column_config = {c: st.column_config.NumberColumn(c, format=format) for c in df_display.columns} if colormap_table: cmap = st.session_state.get("colormap_heatmap") df_display = df_display.style.highlight_max(axis=int(transpose) , props="font-weight: bold;") df_display = df_display.background_gradient(cmap=get_cmap(cmap),subset = subset, axis=int(transpose),vmin = 0.5,vmax=1) # df_display = df_display.set_table_styles([{'selector':'th', # 'props':[('word-wrap', ' break-word'), # ('max-width','10px'), # ( 'text-align', 'left') # ] # }]) return st.dataframe(df_display, column_config=column_config) @st.fragment def show_leaderboard(task, score: str = "source"): split = st.session_state.get("split", "public") results = load_results(task, best_only=True) source_split_map = {} if split in ["private", "private_only"]: _sol_df = pd.read_csv(COMP_CACHE / task / "solution-processed.csv") pairs_df = _sol_df[["source_og", "split"]].drop_duplicates() source_split_map = {x: y for x, y in zip(pairs_df["source_og"], pairs_df["split"])} cols = [ "balanced_accuracy", "generated_accuracy", "real_accuracy", # "pristine_accuracy", "auc", "total_time", "datetime", "fail_rate", ] results_for_split_score = results[f"{split}_{score}_score"] all_teams = get_unique_teams(results_for_split_score.index.to_series()) default = get_default_teams(all_teams) # st.write(default) with st.expander("Display Options",expanded=False): teams = st.multiselect("Teams", options=all_teams, default=default,key=f"ms_lead_{task}") colormap_table = st.checkbox("Colormap",value=True, key = f"{task}_colormap_table") transpose = st.checkbox("Transpose", value=True, key = f"{task}_transpose_table") results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)] column_config = { "balanced_accuracy": st.column_config.NumberColumn( "⚖️ Balanced Accruacy", format="compact", min_value=0, # pinned=True, max_value=1.0, # width="small", ), "generated_accuracy": st.column_config.NumberColumn( "👤 True Postive Rate", format="compact", min_value=0, # pinned=True, max_value=1.0, # width="small", ), "real_accuracy": st.column_config.NumberColumn( "🧑‍🎤 True Negative Rate", format="compact", min_value=0, # pinned=True, max_value=1.0, # width="small", ), "auc": st.column_config.NumberColumn( "📐 AUC", format="compact", min_value=0, # pinned=True, max_value=1.0, # width="small", ), "total_time": st.column_config.NumberColumn( "🕒 Inference Time (s)", format="compact", # pinned=True, # width="small", ), "datetime": st.column_config.DatetimeColumn( "🗓️ Submission Date", format="YYYY-MM-DD", # width="small", ), "fail_rate": st.column_config.NumberColumn( "❌ Fail Rate", format="compact", # width="small", ), } labels = {"real": "🧑‍🎤", "generated": "👤"} for c in results_for_split_score.columns: if "accuracy" in c: continue if any(p in c for p in ["generated", "real"]): s = c.split("_") pred = s[0] source = " ".join(s[1:]) column_config[c] = st.column_config.NumberColumn( labels[pred] + " " + source, help=c, format="compact", min_value=0, max_value=1.0, ) sum_tab, by_source_tab = st.tabs(["Summary","By Source"]) with sum_tab: "#### Summary" df_summary = results_for_split_score.loc[:, cols] if colormap_table: cmap = st.session_state.get("colormap_heatmap") df_summary = df_summary.style.highlight_max(axis=0 , props="font-weight: bold;", subset = cols[:4]) df_summary = df_summary.background_gradient(cmap=get_cmap(cmap),axis=0,vmin = 0.5,vmax=1,subset = cols[:4] ) st.dataframe(df_summary, column_config=column_config) with by_source_tab: f"##### Accuracy Breakdown by Source" accuracy_types = { "True positive/negative rate": 0, "Conditional balanced accuracy": 1, "AUC": 2, } granularity = st.radio( "accuracy type", list(accuracy_types.keys()), key=f"granularity-{task}-{score}", horizontal=True, label_visibility="collapsed", index=0, ) ## Subset the dataset cols = [ c for c in results_for_split_score.columns if "generated_" in c and "accuracy" not in c and "conditional" not in c ] col_names = [ ( f"📢 {c.replace('generated_', '')}" if source_split_map.get(c.replace("generated_", ""), "public") == "public" else f"🔐 {c.replace('generated_', '')}" ) for c in results_for_split_score.columns if "generated_" in c and "accuracy" not in c and "conditional" not in c ] gen_tmp = results_for_split_score.loc[:, cols].copy() gen_tmp.columns = col_names cols = [ c for c in results_for_split_score.columns if "real_" in c and "accuracy" not in c and "conditional" not in c ] col_names = [ ( f"📢 {c.replace('real_', '')}" if source_split_map.get(c.replace("real_", ""), "public") == "public" else f"🔐 {c.replace('real_', '')}" ) for c in results_for_split_score.columns if "real_" in c and "accuracy" not in c and "conditional" not in c ] real_tmp = results_for_split_score.loc[:, cols].copy() real_tmp.columns = col_names ## Check cases if accuracy_types[granularity] == 0: "#### 👤 True Positive Rate | Generated Source" # st.dataframe(gen_tmp, column_config=column_config) top_n = st.session_state.get("top_n", None) show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) "#### 🧑‍🎤 True Negative Rate | Real Source" # st.dataframe(real_tmp, column_config=column_config) show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) elif accuracy_types[granularity] == 1: "#### 👤 Balanced Accuracy | Generated Source" tnr = results_for_split_score.loc[:, ["real_accuracy"]] gen_tmp[:] = (gen_tmp.values + tnr.values) / 2.0 # st.dataframe(gen_tmp, column_config=column_config) top_n = st.session_state.get("top_n", None) show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) "#### 🧑‍🎤 Balanced Accuracy | Real Source" tpr = results_for_split_score.loc[:, ["generated_accuracy"]] real_tmp[:] = (real_tmp.values + tpr.values) / 2.0 # st.dataframe(real_tmp, column_config=column_config) show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) else: cols = [c for c in results_for_split_score.columns if "generated_conditional_auc" in c] col_names = [ ( f"📢 {c.replace('generated_conditional_auc_', '')}" if source_split_map.get(c.replace("generated_conditional_auc_", ""), "public") == "public" else f"🔐 {c.replace('generated_conditional_auc_', '')}" ) for c in results_for_split_score.columns if "generated_conditional_auc_" in c ] gen_tmp = results_for_split_score.loc[:, cols].copy() gen_tmp.columns = col_names cols = [c for c in results_for_split_score.columns if "real_conditional_auc" in c] col_names = [ ( f"📢 {c.replace('real_conditional_auc_', '')}" if source_split_map.get(c.replace("real_conditional_auc_", ""), "public") == "public" else f"🔐 {c.replace('real_conditional_auc_', '')}" ) for c in results_for_split_score.columns if "real_conditional_auc" in c ] real_tmp = results_for_split_score.loc[:, cols].copy() real_tmp.columns = col_names "#### 👤 Conditional AUC | Generated Source" # st.dataframe(gen_tmp, column_config=column_config) top_n = st.session_state.get("top_n", None) show_dataframe_w_format(gen_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) "#### 🧑‍🎤 Conditional AUC | Real Source" # st.dataframe(real_tmp, column_config=column_config) show_dataframe_w_format(real_tmp, top_n=top_n,colormap_table=colormap_table, transpose=transpose) def make_roc(results, show_text=False, log_x=False): results["FA"] = 1.0 - results["real_accuracy"] chart = ( alt.Chart(results) .mark_point(filled=True) .encode( x=alt.X("FA:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(type = "log" if log_x else "linear",domain=[.005, 1])), y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0.0, 1.0])), color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field size=alt.Size( "total_time:Q", title="🕒 Inference Time", scale=alt.Scale(rangeMin=100) ), # Size by quantitative field shape=alt.Shape("split:N", title="Split"), detail=["submission_id", "auc", "balanced_accuracy"], ) .properties(width=400, height=400, title="Detection vs False Alarm vs Inference Time") ) if show_text: text = ( alt.Chart(results) .mark_text( align="right", fontSize=14, dx=-5, # shift text to right of point dy=-5, # shift text slightly up ) .encode( x=alt.X("FA:Q"),#, title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0, 1])), y=alt.Y("generated_accuracy:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0, 1])), color=alt.Color("team:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field text="team", ) ) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightgray", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) diag_line2 = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightblue", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) return chart + diag_line + diag_line2 def make_acc(results, show_text=False, metric_spec=("balanced_accuracy", "Balanced Accuracy")): metric, metric_title = metric_spec results = results.loc[results["total_time"] >= 0] chart = ( alt.Chart(results) .mark_point(size=200, filled=True) .encode( x=alt.X("total_time:Q", title="🕒 Inference Time (sec)", scale=alt.Scale(type="log", domain=[1000, 20000])), y=alt.Y( f"{metric}:Q", title=metric_title, scale=alt.Scale(domain=[0.4, 1]), ), shape=alt.Shape("split:N", title="Split"), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field ) .properties(width=400, height=400, title=f"Inference Time vs {metric_title}") ) if show_text: text = ( alt.Chart(results) .mark_text( align="right", dx=-5, # shift text to right of point dy=-5, # shift text slightly up fontSize=14, ) .encode( x="total_time:Q", y=alt.Y( f"{metric}:Q", title=metric_title, scale=alt.Scale(domain=[0.4, 1]), ), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field text="team", ) ) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(t=[100, 20000], y=[0.5, 0.5]))) .mark_line(color="lightgray", strokeDash=[8, 4]) .encode(x="t", y="y") ) diag_line2 = ( alt.Chart(pd.DataFrame(dict(t=np.linspace(1000,20000,100), y=np.linspace(.5,1.,100)))) .mark_line(color="lightgray", strokeDash=[8, 4]) .encode(x="t", y="y") ) return chart + diag_line+diag_line2 def make_acc_vs_auc(results, show_text=False, flip=False): # results = results.loc[results["total_time"] >= 0] chart = ( alt.Chart(results) .mark_point(size=200, filled=True) .encode( x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])), y=alt.Y( "balanced_accuracy:Q", title="Balanced Accuracy", scale=alt.Scale(domain=[0.4, 1]), ), shape=alt.Shape("split:N", title="Split"), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field ) .properties(width=400, height=400, title="AUC vs Balanced Accuracy") ) if flip: chart = chart.encode(x=chart.encoding.y, y=chart.encoding.x) if show_text: text = ( alt.Chart(results) .mark_text( align="right", dx=-5, # shift text to right of point dy=-5, # shift text slightly up fontSize=14, ) .encode( x=alt.X("auc:Q", title="Area Under Curve", scale=alt.Scale(domain=[0.4, 1])), y=alt.Y( "balanced_accuracy:Q", title="Balanced Accuracy", scale=alt.Scale(domain=[0.4, 1]), ), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field text="team", ) ) if flip: text = text.encode(x=text.encoding.y, y=text.encoding.x) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0]))) .mark_line(color="lightgray", strokeDash=[8, 4]) .encode(x="x", y="y") ) if flip: diag_line = diag_line.encode(x=diag_line.encoding.y, y=diag_line.encoding.x) full_chart = chart + diag_line return full_chart def make_vs_public(results, show_text=False, other_split=None): # results = results.loc[results["total_time"] >= 0] # results.groupby() chart = ( alt.Chart(results) .mark_point(size=200, filled=True) .encode( x=alt.X("public:Q", title="public", scale=alt.Scale(domain=[0.6, 1])), y=alt.Y(f"{other_split}:Q", title=f"{other_split}", scale=alt.Scale(domain=[0.6, 1])), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field ) .properties(width=500, height=500, title=f"public vs {other_split}") ) if show_text: text = ( alt.Chart(results) .mark_text( align="right", dx=-5, # shift text to right of point dy=-5, # shift text slightly up fontSize=14, ) .encode( x=alt.X("public:Q"), y=alt.Y(f"{other_split}:Q"), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field text="team", ) ) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0]))) .mark_line(color="lightgray", strokeDash=[8, 4]) .encode(x="x", y="y") ) full_chart = chart + diag_line return full_chart def show_aug_plot(results,log_x = False,show_text=True): chart = ( alt.Chart(results) .mark_point(filled=True,size = 200) .encode( x=alt.X("fpr:Q", title="🧑‍🎤 False Positive Rate", scale=alt.Scale(type = "log" if log_x else "linear",domain=[.1, 1])), y=alt.Y("tpr:Q", title="👤 True Positive Rate", scale=alt.Scale(domain=[0.4, 1.0])), color=alt.Color("aug:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field detail=["fpr", "tpr", "aug","team"], ) .properties(width=800, height=600, title="Detection vs False Alarm Per Augmentation") ) if show_text: text = ( alt.Chart(results.loc[results.team.str.startswith("top")]) .mark_text( align="right", fontSize=14, dx=-8, # shift text to right of point dy=-5, # shift text slightly up ) .encode( x=alt.X("fpr:Q"),#, title="🧑‍🎤 False Positive Rate", scale=alt.Scale(domain=[0, 1])), y=alt.Y("tpr:Q"),#, title="👤 True Positive Rate", scale=alt.Scale(domain=[0, 1])), color=alt.Color("aug:N", scale=alt.Scale(scheme=color_map)), # Color by categorical field text="aug", ) ) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightgray", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) diag_line2 = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightblue", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) return (chart + diag_line + diag_line2).interactive() def get_heatmaps(temp): h1 = make_heatmap(temp, "generated", symbol="👤") h2 = make_heatmap(temp, "real", symbol="🧑‍🎤") st.altair_chart(h1, use_container_width=True) st.altair_chart(h2, use_container_width=True) if temp.columns.str.contains("aug", case=False).any(): h3 = make_heatmap(temp, "aug", symbol="🛠️") st.altair_chart(h3, use_container_width=True) @st.fragment def show_augmentations(task, score): split = st.session_state.get("split", "public") results = load_results(task, best_only=True) results_for_split_score = results[f"{split}_{score}_score"] all_teams = get_unique_teams(results_for_split_score.index.to_series()) teams = st.multiselect("Teams", options=all_teams, default=get_default_teams(all_teams),key=f"ms_aug_{task}") results_for_split_score = results_for_split_score.loc[results_for_split_score.index.isin(teams)] # st.dataframe(results_for_split_score) f"##### Accuracy Breakdown by Category" accuracy_types = { "Accuracy": 0, "AUC": 1, } # Create a row with two columns for controls col1, col2 = st.columns([0.1, 0.9]) with col1: granularity = st.radio( "accuracy type", list(accuracy_types.keys()), key=f"granularity-{task}-{score}", horizontal=True, label_visibility="collapsed", index=0, ) show_deltas = False if split in ["private", "private_only"]: with col2: # Add toggle for showing deltas from "none" column show_deltas = st.toggle( "Show deltas from 'none' (higher values mean 'none' was **lower**)", value=False, key=f"deltas-{task}-{score}", ) with col2: colormap_table = st.checkbox("Colormap",value=True, key = f"{task}_colormap_table_aug") sort_by_top = st.checkbox("Sort by Top N",value=False ) transpose = st.checkbox("Transpose", value=True, key = f"{task}_transpose_aug_table") ## Check cases if accuracy_types[granularity] == 0: "#### Balanced Accuracy" gen_cols = [ c for c in results_for_split_score.columns if "generated_" in c and "accuracy" not in c and "conditional" not in c ] gen_tmp = results_for_split_score.loc[:, gen_cols].copy() gen_tmp.columns = [ c.replace("generated_", "") for c in results_for_split_score.columns if "generated_" in c and "accuracy" not in c and "conditional" not in c ] real_cols = [ c for c in results_for_split_score.columns if "real_" in c and "accuracy" not in c and "conditional" not in c ] real_tmp = results_for_split_score.loc[:, real_cols].copy() real_tmp.columns = [ c.replace("real_", "") for c in results_for_split_score.columns if "real_" in c and "accuracy" not in c and "conditional" not in c ] tmp = (gen_tmp + real_tmp) / 2.0 # If toggle is on and "none" column exists, calculate deltas from "none" column if show_deltas and "none" in tmp.columns: # Get the "none" column values none_values = tmp["none"].copy() # Calculate deltas: none - current_column for col in tmp.columns: if col != "none": tmp[col] = -none_values + tmp[col] # st.dataframe(tmp) top_n = st.session_state.get("top_n", None) show_dataframe_w_format(tmp, top_n=top_n, colormap_table=colormap_table,sort_columns=False, sort_by_top=sort_by_top,transpose=transpose) # st.dataframe(tmp) top_n_teams = tmp.sort_values("none", ascending = False).index[:top_n] # gen_tmp = gen_tmp.sort_values("none", ascending = False) gen_tmp.loc[f"top-{top_n}",:] = gen_tmp.loc[top_n_teams,:].mean(0) gen_tmp.columns.name = "aug" gen_tmp = gen_tmp.stack().to_frame("tpr")#.set_index(["team","aug"]) real_tmp = real_tmp.sort_values("none", ascending = False) real_tmp.loc[f"top-{top_n}",:] = real_tmp.loc[top_n_teams,:].mean(0) real_tmp.columns.name = "aug" real_tmp = real_tmp.stack() real_tmp = 1-real_tmp real_tmp = real_tmp.to_frame("fpr")#.set_index(["team","aug"]) tmp = pd.concat([real_tmp,gen_tmp],axis = 1 ).reset_index() # st.write(tmp) only_top = st.toggle("Only Top") if only_top: tmp = tmp.loc[tmp.team == f"top-{top_n}"] else: tmp = tmp.loc[tmp.team.isin( [f"top-{top_n}"] + top_n_teams.tolist())] def short_names(n): n = n.replace("none","NONE") n = n.replace("compression","cm") n = n.replace("interpolation","interp") n = n.replace("adjustment","adj") return n tmp["aug"] = tmp["aug"].apply(short_names) show_text = st.toggle("Show Labels") log_x = st.toggle("FPR on log scale") tpr_fpr = show_aug_plot(tmp,show_text = show_text, log_x = log_x) st.altair_chart(tpr_fpr,use_container_width=False) else: cols = [c for c in results_for_split_score.columns if "conditional_auc" in c] col_names = [ c.replace("conditional_auc_", "") for c in results_for_split_score.columns if "conditional_auc" in c ] tmp = results_for_split_score.loc[:, cols].copy() tmp.columns = col_names "#### Conditional AUC" # If toggle is on and "none" column exists, calculate deltas from "none" column if show_deltas and "none" in tmp.columns: # Get the "none" column values none_values = tmp["none"].copy() # Calculate deltas: none - current_column for col in tmp.columns: if col != "none": tmp[col] = -none_values + tmp[col] # st.dataframe(tmp) top_n = st.session_state.get("top_n", None) show_dataframe_w_format(tmp, top_n=top_n, colormap_table=colormap_table,sort_columns=False, sort_by_top=sort_by_top,transpose=transpose) @st.fragment def show_charts(task, score="source"): show_auc = st.toggle("Show Best w.r.t. AUC", value=False, key=f"toggle auc {task}") log_x = st.toggle("FPR on Log Scale",value=False,key=f"toggle log {task}") metric = "auc" if show_auc else "balanced_accuracy" split = st.session_state.get("split", "public") hf_token = st.session_state.get("hf_token", None) results = load_results(task, best_only=True, metric=metric) temp = results[f"{split}_source_score"].reset_index() temp_public = results[f"public_source_score"].reset_index() temp["split"] = split temp_public["split"] = "public" teams = get_unique_teams(temp["team"]) default = get_default_teams(teams) best_only = True compare = False if split != "public": b1, b2 = st.columns([0.2, 0.8]) with b1: best_only = st.toggle("Best Only", value=True, key=f"best only {task} {score} {split}") full_curves = st.toggle("Full curve", value=True, key=f"all curves {task}") compare = st.toggle(f"Compare vs Public",value=False, key=f"compare {task}") if not best_only: results = load_results(task, best_only=best_only, metric=metric) temp = results[f"{split}_source_score"].reset_index() temp_public = results["public_source_score"].reset_index() # selected_team = st.pills( # "Team", ["ALL"] + teams, key=f"teams {task} 1", default=["ALL"], selection_mode="multi" # ) with b2: # selected_team = st.pills( # "Team", ["ALL"] + teams, key=f"teams {task} 2", default=default, selection_mode="multi" # ) default = get_default_teams(teams) selected_team = st.multiselect("Teams", options=teams, default=default,key=f"charts_{task}") if selected_team is None or len(selected_team) == 0: return # if "ALL" in selected_team: # selected_team = ["ALL"] # if "ALL" not in selected_team: temp = filter_teams(temp, selected_team) temp_public = filter_teams(temp_public, selected_team) # with st.spinner("making plots...", show_time=True): # st.write(temp) roc_scatter = make_roc(temp, show_text=best_only & (not compare), log_x = log_x) acc_vs_time = make_acc( temp, show_text=best_only & (not compare), metric_spec=("auc", "Area Under Curve") if show_auc else ("balanced_accuracy", "Balanced Accuracy"), ) acc_vs_auc = make_acc_vs_auc(temp, show_text=best_only & (not compare), flip=show_auc) if split == "private" and hf_token is not None: if full_curves: roc_scatter = make_roc_curves(task, temp["submission_id"].values.tolist()) + roc_scatter st.altair_chart((roc_scatter | acc_vs_time | acc_vs_auc).interactive(), use_container_width=False) # if compare: # st.altair_chart(public_vs_private, use_container_width=False) if compare: temp["split"] = split temp_public["split"] = "public" temp = pd.concat([temp, temp_public], ignore_index=True) # metric = "balanced_accuracy" if not show_auc else "auc" temp_vs_public = temp.set_index(["team", "submission_id", "split"])[metric].unstack().reset_index() best = st.toggle("best") if best: temp_vs_public = temp_vs_public.sort_values("public",ascending = False).drop_duplicates("team") c1,c2 = st.columns(2) with c1: public_vs_private = make_vs_public(temp_vs_public, show_text=best, other_split=split) st.altair_chart(public_vs_private.interactive(), use_container_width=False) with c2: diff = "% drop" temp_vs_public[diff] = 100*(temp_vs_public["public"] - temp_vs_public["private_only"])/temp_vs_public["public"] cmap = st.session_state.get("colormap_heatmap") temp_vs_public_style = temp_vs_public.set_index("team").loc[:,["public","private_only",diff]].sort_values("private_only",ascending = False).style.highlight_max(axis=0 , props="font-weight: bold;") temp_vs_public_style = temp_vs_public_style.background_gradient(cmap=get_cmap(cmap),subset = [diff], axis=1, vmin = 0,vmax = 10) st.dataframe(temp_vs_public_style, column_config={c:st.column_config.NumberColumn(format= "compact") for c in ["public","private_only",diff]}) st.info(f"loading {temp['submission_id'].nunique()} submissions") @st.cache_data def compute_running_max(result_df, teams, metric): # Group by team and sort by datetime result_df = result_df.copy() result_df = result_df.loc[result_df["team"].isin(teams)] result_df["datetime"] = pd.to_datetime(result_df["datetime"]) return ( result_df.groupby("team") .apply(lambda a: a.sort_values("datetime").set_index("datetime")[metric].cummax()) .reset_index() ) @st.fragment def show_timeline(task, score="source"): split = st.session_state.get("split", "public") hf_token = st.session_state.get("hf_token", None) results = load_results(task, best_only=False) temp = results[f"{split}_source_score"].reset_index() all_teams = get_unique_teams(temp["team"]) all_teams = list(filter(lambda a: a!="Baseline",all_teams)) default = [t for t in all_teams if ("test" not in t.lower())] teams = st.multiselect("Teams", options=all_teams, default=default) metric = st.selectbox("Metric", ["auc", "balanced_accuracy"], key=f"time {task}") baseline_val = temp.query("team=='Baseline'")[metric].max() df = compute_running_max(temp, teams, metric).dropna() # team_best = df.groupby("team")[metric].max().sort_values(ascending = False) team_best = df.sort_values([metric,"datetime"],ascending = False).drop_duplicates(["team"]) team_order = team_best["team"].tolist() + ["Baseline"] random_guess = ( alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], metric: [0.5, 0.5]})) .mark_line(strokeDash=[4, 4], color="grey", strokeWidth=2) .encode( x="datetime:T", y=f"{metric}:Q", ) ) # st.write(st.session_state) baseline_chart = ( alt.Chart(pd.DataFrame({"datetime": [df["datetime"].min(), df["datetime"].max()], "team": "Baseline", metric: [baseline_val,baseline_val]})) .mark_line(strokeDash=[8, 8], color="darkgray", strokeWidth=2) .encode( x="datetime:T", y=f"{metric}:Q", color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")),sort=team_order), ) ) # Create main chart task_chart = ( alt.Chart(df) .mark_line(point=True, interpolate='step-after') .encode( x=alt.X( "datetime:T", title="Submission Date", ), y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])), color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")), sort=team_order), ) .properties(width=800, height=500, title="Best Performance Over Time (Original Content)") .interactive() ) if st.checkbox("Show Labels",value=True,key = f"{task} check show timeline"): team_best.loc[len(team_best)] = {"team":"Baseline", metric:baseline_val, "datetime": df["datetime"].max()} # st.write(team_best) text_chart = ( alt.Chart(team_best) .mark_text( align="left", fontSize=14, dx=5, # shift text to right of point dy=-5, # shift text slightly up ) .encode( x=alt.X( "datetime:T", title="Submission Date", scale = alt.Scale(domain=[df["datetime"].min(), df["datetime"].max() + datetime.timedelta(days = 4)]), ), y=alt.Y(f"{metric}:Q", scale=alt.Scale(domain=[0.5, 1.0])), color=alt.Color("team:N", scale=alt.Scale(scheme=st.session_state.get("colormap", "paired")), sort=team_order), text="team", ) ) # Combine charts and display st.altair_chart((task_chart +baseline_chart+text_chart).configure_legend(disable=True), use_container_width=True) # st.altair_chart(task_chart, use_container_width=True) def make_plots_for_task(task): if len(TASKS.get(task)) > 1: t1, t2, t3, t4 = st.tabs(["Tables", "Charts", "Timeline", "Augmentations"]) else: t1, t2, t3 = st.tabs(["Tables", "Charts", "Timeline"]) t4 = None with t1: show_leaderboard(task) with t2: show_charts(task, score="source") with t3: split = st.session_state.get("split", "public") if split != "public": show_timeline(task, score="source") else: st.info(f"not available in {split} in mode") if t4 is not None: with t4: show_augmentations(task, score="category") updated = get_updated_time() st.markdown(updated) @st.fragment def show_task_comparison(): """Show summary tables for Task 1 and Task 2 side by side.""" split = st.session_state.get("split", "public") color_map = st.session_state.get("colormap", "paired") metric = st.selectbox("Metric", ["balanced_accuracy", "auc"]) task1_key = list(TASKS.keys())[1] # video-challenge-task-1-config task2_key = list(TASKS.keys())[2] # video-challenge-task-2-config task1_results = load_results(task1_key, best_only=True, metric=metric) task2_results = load_results(task2_key, best_only=True, metric=metric) cols = ["balanced_accuracy", "auc","total_time","generated_accuracy","real_accuracy"] # st.write(task1_results[f"{split}_source_score"]) task1_results_split_source_score = task1_results[f"{split}_source_score"].loc[:,cols] task2_results_split_source_score = task2_results[f"{split}_source_score"].loc[:,cols] combined = pd.concat([task1_results_split_source_score, task2_results_split_source_score], axis=1, keys = ["Task 1", "Task 2"]) combined.columns.names = ["Task", "Metric"] combined = combined.sort_index(level = "Metric",axis = 1).swaplevel(axis=1) combined = combined.rename(columns={"generated_accuracy":"tpr","real_accuracy":"tnr"}) # .swaplevel(axis = 1) # st.write(combined.loc[:,["tpr"]]) # st.write(combined) all_teams = get_unique_teams(combined.index.to_series()) # all_teams_2 = get_unique_teams(task2_results_split_source_score.index.to_series()) # all_teams = list(set(all_teams_1 + all_teams_2)) default = get_default_teams(all_teams) teams = st.multiselect("Teams", options=all_teams, default=default,key=f"comp_lead") combined = combined.loc[combined.index.isin(teams)] task1_results_split_source_score = task1_results_split_source_score.loc[task1_results_split_source_score.index.isin(teams)] task2_results_split_source_score = task2_results_split_source_score.loc[task2_results_split_source_score.index.isin(teams)] column_config = { "balanced_accuracy": st.column_config.NumberColumn( "⚖️ Balanced Accuracy", format="compact", min_value=0, max_value=1.0, ), "generated_accuracy": st.column_config.NumberColumn( "👤 True Positive Rate", format="compact", min_value=0, max_value=1.0, ), "real_accuracy": st.column_config.NumberColumn( "🧑‍🎤 True Negative Rate", format="compact", min_value=0, max_value=1.0, ), "auc": st.column_config.NumberColumn( "📐 AUC", format="compact", min_value=0, max_value=1.0, ), "total_time": st.column_config.NumberColumn( "🕒 Inference Time (s)", format="compact", ), "datetime": st.column_config.DatetimeColumn( "🗓️ Submission Date", format="YYYY-MM-DD", ), "fail_rate": st.column_config.NumberColumn( "❌ Fail Rate", format="compact", ), "task1_balanced_accuracy": st.column_config.NumberColumn( "⚖️ Task 1 Balanced Accuracy", format="compact", min_value=0, max_value=1.0, ), "task2_balanced_accuracy": st.column_config.NumberColumn( "⚖️ Task 2 Balanced Accuracy", format="compact", min_value=0, max_value=1.0, ), "difference": st.column_config.NumberColumn( "⚖️ Difference (T1-T2)", format="compact", ), "percent_change": st.column_config.NumberColumn( "% Change", format="+.2%", ), } # Create tabs for different views tables_tab, charts_tab = st.tabs(["Tables", "Charts"]) with tables_tab: # Create two columns for side-by-side tables # st.subheader("Performance Comparison: Task 1 vs Task 2") # col1, col2 = st.columns(2) # with col1: # st.subheader("Task 1: Original Content") # st.dataframe( # task1_results_split_source_score.loc[:, cols], # column_config=column_config, # use_container_width=True, # ) # with col2: # st.subheader("Task 2: Post-processed Content") # st.dataframe( # task2_results_split_source_score.loc[:, cols], # column_config=column_config, # use_container_width=True, # ) # Add a section for comparison of task performance differences st.subheader("Performance Analysis") st.markdown( """ Performance comparison between Task 1 (original content) and Task 2 (post-processed content). A positive difference indicates degraded performance on post-processed content. """ ) # st.write(combined) # index = combined.columns.get_loc("auc") combined_styled = combined.loc[:,["balanced_accuracy", "auc","total_time"]].rename(columns={"auc":"📐 AUC", "balanced_accuracy":"⚖️ Balanced Accuracy","total_time":"🕒 Run Time"}) if st.checkbox("Colormap",value=True): cmap = st.session_state.get("colormap_heatmap") combined_styled = combined_styled.style.highlight_max(axis=0, subset = ["📐 AUC","⚖️ Balanced Accuracy"] , props="font-weight: bold;") combined_styled = combined_styled.background_gradient(cmap=get_cmap(cmap),axis=0,vmin = 0.5,vmax=1, subset =["📐 AUC","⚖️ Balanced Accuracy"] ) # optional: bold headers st.dataframe(combined_styled,column_config={"Task 1" :st.column_config.NumberColumn(format="compact"),"Task 2":st.column_config.NumberColumn(format="compact") } ,use_container_width=True) # show_dataframe_w_format(combined, top_n=0) # # Get the datasets for both tasks # task1_df = task1_results_split_source_score.reset_index() # task2_df = task2_results_split_source_score.reset_index() # # Create a combined dataframe for analysis # common_teams = set(task1_df["team"]) & set(task2_df["team"]) # if common_teams: # # Filter to teams that appear in both tasks # task1_filtered = task1_df[task1_df["team"].isin(common_teams)] # task2_filtered = task2_df[task2_df["team"].isin(common_teams)] # # Create a comparison dataframe # comparison_df = pd.DataFrame( # { # "team": list(common_teams), # "task1_balanced_accuracy": [ # task1_filtered[task1_filtered["team"] == team]["balanced_accuracy"].values[0] # for team in common_teams # ], # "task2_balanced_accuracy": [ # task2_filtered[task2_filtered["team"] == team]["balanced_accuracy"].values[0] # for team in common_teams # ], # } # ) # # Calculate differences and percentage changes # comparison_df["difference"] = ( # comparison_df["task1_balanced_accuracy"] - comparison_df["task2_balanced_accuracy"] # ) # comparison_df["percent_change"] = comparison_df["difference"] / comparison_df["task1_balanced_accuracy"] # # Sort by the absolute difference (to show biggest performance changes first) # comparison_df = ( # comparison_df.sort_values(by="difference", ascending=False).reset_index(drop=True).set_index("team") # ) # # Display the comparison table # show_dataframe_w_format(comparison_df, top_n=0) # else: # st.warning("No common teams found across both tasks.") with charts_tab: st.subheader("Team Performance Across Tasks") # Get the datasets for both tasks if not already done # if "task1_df" not in locals(): # task1_df = task1_results_split_source_score.reset_index() # task2_df = task2_results_split_source_score.reset_index() # common_teams = set(task1_df["team"]) & set(task2_df["team"]) # if common_teams: # Prepare data for the plot # plot_data = [] # for team in common_teams: # # Get team's balanced accuracy for each task # task1_acc = task1_df[task1_df["team"] == team][metric].values[0] # task2_acc = task2_df[task2_df["team"] == team][metric].values[0] # # Add points for Task 1 # plot_data.append({"team": team, "task": "Task 1", metric: task1_acc}) # # Add points for Task 2 # plot_data.append({"team": team, "task": "Task 2", metric: task2_acc}) # plot_df = pd.DataFrame(plot_data).set_index(["team", "task"])[metric].unstack().reset_index() # st.write(combined) plot_df = combined.loc[:,"auc"] # plot_df.index.name = "team" plot_df = plot_df.reset_index() # st.write(plot_df) chart = ( alt.Chart(plot_df) .mark_circle(size=200) .encode( x=alt.X("Task 1:Q", title=f"Task 1 AUC", scale=alt.Scale(domain=[0.4, 1])), y=alt.Y("Task 2:Q", title=f"Task 2 AUC", scale=alt.Scale(domain=[0.4, 1])), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field ) .properties(width=400, height=400, title="Task 1 vs Task 2: AUC") .interactive() ) # if show_text: text = ( alt.Chart(plot_df) .mark_text( align="right", dx=-5, # shift text to right of point dy=-5, # shift text slightly up fontSize=14, ) .encode( x=alt.X("Task 1:Q"), y=alt.Y("Task 2:Q"), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field text="team", ) ) chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(x=[0.4, 1.0], y=[0.4, 1.0]))) .mark_line(color="lightgray", strokeDash=[8, 4]) .encode(x="x", y="y") ) # combined[:,"fpr"] = 1 - combined[:,"tpr"] chart1 = chart + diag_line # st.altair_chart(, use_container_width=False) plot_df = combined.unstack().reset_index().set_index(["Task","team","Metric"]).loc[:,0].unstack().reset_index() plot_df["fpr"] = 1. - plot_df["tnr"] # st.write(plot_df) base = ( alt.Chart(plot_df) .encode( x=alt.X("fpr", title=f"False Positive Rate", scale=alt.Scale(type = "linear", domain=[0.001, 1])), y=alt.Y("tpr", title=f"True Positive Rate", scale=alt.Scale(domain=[0., 1])), shape = alt.Shape("Task:N",scale=alt.Scale(domain=['Task 2', 'Task 1'])), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field ) .properties(width=400, height=400, title="Task 1 vs Task 2: TPR vs FPR") .interactive() ) chart = base.mark_line() point = base.mark_point(filled=True, size = 200) chart = chart + point # if show_text: text = ( alt.Chart(plot_df) .mark_text( align="right", dx=-5, # shift text to right of point dy=-5, # shift text slightly up fontSize=14, ) .encode( x=alt.X("fpr", title=f"False Positive Rate", scale=alt.Scale(domain=[0., 1])), y=alt.Y("tpr", title=f"True Positive Rate", scale=alt.Scale(domain=[0., 1])), color=alt.Color( "team:N", scale=alt.Scale(scheme=color_map) ), # Color by categorical field # Size by quantitative field text="team", ) ) # chart = chart + text diag_line = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(0,1,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightgray", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) diag_line2 = ( alt.Chart(pd.DataFrame(dict(tpr=np.linspace(1,0,100), fpr=np.linspace(0,1,100)))) .mark_line(color="lightblue", strokeDash=[8, 4], size=1) .encode(x="fpr", y="tpr") ) # combined[:,"fpr"] = 1 - combined[:,"tpr"] chart2 = chart + diag_line + diag_line2 st.altair_chart(chart1 | chart2, use_container_width=False) # Create line chart connecting team performances # lines = ( # alt.Chart(plot_df) # .mark_line(point=alt.OverlayMarkDef(filled=True, size=100), strokeDash=[4, 2], strokeWidth=2) # .encode( # x=alt.X("task:N", title="Task", sort=["Task 1", "Task 2"]), # y=alt.Y("balanced_accuracy:Q", title="Balanced Accuracy", scale=alt.Scale(domain=[0.4, 1.0])), # color=alt.Color( # "team:N", scale=alt.Scale(scheme=color_map_choice), legend=alt.Legend(title="Teams") # ), # tooltip=["team:N", "task:N", "balanced_accuracy:Q"], # ) # .properties(width=700, height=500, title="Performance Changes Across Tasks") # ) # st.altair_chart(lines, use_container_width=False) t1, t2, tp, comparison_tab, volume_tab, all_submission_tab, san_check, data_desc = st.tabs( ["**Task 1**", "**Task 2**", "**Pilot Task**", "**Compare Tasks**", "**Submission Volume**", "**All Submissions**","**Sanity Check**","**Data Description**"] ) with t1: "*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*" make_plots_for_task(list(TASKS.keys())[1]) with t2: "*Detection of Post-processed Synthetic Video Content. A subset of Task 1 data files are modified with standard post-processing techniques (compression, resizing, etc).*" make_plots_for_task(list(TASKS.keys())[2]) with tp: "*Detection of Synthetic Video Content. Video files are unmodified from the original output from the models or the real sources.*" make_plots_for_task(list(TASKS.keys())[0]) if split in ["private", "private_only"]: with comparison_tab: "**Task 1 to Task 2 performance comparison.**" show_task_comparison() with volume_tab: subs = get_volume() status_lookup = "QUEUED,PROCESSING,SUCCESS,FAILED".split(",") found_columns = subs.columns.values.tolist() status_lookup = list(set(status_lookup) & set(found_columns)) st.bar_chart(subs, x="date", y=status_lookup, stack=True) total_submissions = int(subs.loc[:, status_lookup].fillna(0).values.sum()) st.metric("Total Submissions", value=total_submissions) st.metric("Duration", f'{(subs["date"].max() - subs["date"].min()).days} days') @st.fragment def show_all_submissions(): show_all = st.toggle("Show All Columns", value=False) data = load_submission() fields = ["task", "team", "status_reason"] field_values = {f: data[f].unique().tolist() for f in fields} selected_fields = {} for f, v in field_values.items(): selected_fields[f] = st.multiselect(f"Select {f} to Display", v, default=v) mask = np.ones(len(data)).astype(bool) for fs, vs in selected_fields.items(): mask &= data[fs].isin(vs) data = data.loc[mask] search_str = st.text_input("search", value="") if search_str != "": mask_search = ( data.select_dtypes(include=["object"]) .apply(lambda x: x.str.contains(search_str, case=False, na=False)) .any(axis=1) ) data = data.loc[mask_search] if not show_all: columns_to_show = "task,team,datetime,status_reason,submission_repo,submission_id,space_id".split(",") data = data.loc[:, columns_to_show] data = data.sort_values("datetime", ascending=False) # st.write(",".join(data.columns)) st.dataframe(data, hide_index=True) @st.fragment def show_san_check(): for task in list(TASKS.keys()): f"## {task}" out = load_results(task,best_only=True, metric="balanced_accuracy",check_discrepancies=True) for k,v in out.items(): if k.startswith("desc"): f"### {k}" st.write(v) if split == "private": with all_submission_tab: show_all_submissions() with san_check: show_san_check() @st.fragment def show_data_desc(): sources = pd.read_csv("competition_cache/safe-challenge/video-challenge-task-1-config/solution-processed.csv") sources = sources.drop_duplicates(subset = ["source","source_og"]) def fix(el): s = el["source"] if s == "r_09": return "documentary-2" elif s == "r_07": return "documentary-1" else: return el["source_og"] sources["source_og"] = sources.apply(fix,axis = 1) sources = sources.set_index("source_og")["split"] # st.write(sources) def color_rows(row): if row["Split"] == "public": return ["background-color: darkblue"] * len(row) else: return ["background-color: lightcoral"] * len(row) tab_real, tab_gen, tab_aug = tabs = st.tabs(["Real","Generated","Augmentations"]) with tab_real: "### Real Sources" data =pd.read_csv("competition_cache/data_desc/real_video_stats.csv") data["Avg Duration"] = data["Avg Duration"].apply(lambda a: float(a[:-1])) data["Source"] = data["Source"].apply(lambda a: a.replace(" ","-")) data["Split"] = sources.loc[data["Source"].values].values data = data.sort_values(["Split","Source"],ascending = False) # data_styled = data.style.apply(color_rows,axis=1) st.dataframe(data, hide_index = True, height = 800) with tab_gen: "### Synthetic Sources" data =pd.read_csv("competition_cache/data_desc/generated_video_stats.csv") data = data.drop(columns=["Description"]) data["Avg Duration"] = data["Avg Duration"].apply(lambda a: float(a[:-1])) data["Source"] = data["Source"].apply(lambda a: a.replace(" ","-").lower()) data["Split"] = sources.loc[data["Source"].values].values data = data.sort_values(["Split","Source"],ascending = False) st.dataframe(data, hide_index = True, height = 800) with tab_aug: "### Augmentations" data =pd.read_csv("competition_cache/data_desc/post_processing_stats.csv",on_bad_lines="warn") st.dataframe(data, hide_index = True, height = 800) if split == "private": with data_desc: show_data_desc()