|
|
|
|
|
""" |
|
|
Collect evaluation scores from trending models' model-index metadata. |
|
|
|
|
|
Scans trending text-generation models on the Hub and extracts benchmark |
|
|
scores from their model-index metadata or open pull requests. |
|
|
|
|
|
Results are saved to a dataset for the evals leaderboard. |
|
|
|
|
|
Usage: |
|
|
python collect_evals.py [--push-to-hub] |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, timezone |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
import requests |
|
|
import yaml |
|
|
from huggingface_hub import hf_hub_download |
|
|
from huggingface_hub.utils import HfHubHTTPError |
|
|
|
|
|
API_BASE = "https://huggingface.co/api" |
|
|
PIPELINE_FILTER = "text-generation" |
|
|
TRENDING_LIMIT = 50 |
|
|
TRENDING_FETCH_LIMIT = 100 |
|
|
PR_SCAN_LIMIT = 40 |
|
|
USER_AGENT = "skills-evals-leaderboard/0.3" |
|
|
|
|
|
|
|
|
def _normalize(text: Optional[str]) -> str: |
|
|
if not text: |
|
|
return "" |
|
|
text = text.lower() |
|
|
text = re.sub(r"[^a-z0-9]+", " ", text) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def _coerce_score(value: Any) -> Optional[float]: |
|
|
if value is None: |
|
|
return None |
|
|
if isinstance(value, (int, float)): |
|
|
return float(value) |
|
|
if isinstance(value, str): |
|
|
candidate = value.strip() |
|
|
if candidate.endswith("%"): |
|
|
candidate = candidate[:-1] |
|
|
try: |
|
|
return float(candidate) |
|
|
except ValueError: |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class BenchmarkSpec: |
|
|
key: str |
|
|
label: str |
|
|
aliases: tuple[str, ...] |
|
|
|
|
|
def matches(self, fields: List[str]) -> bool: |
|
|
for alias in self.aliases: |
|
|
alias_norm = _normalize(alias) |
|
|
if not alias_norm: |
|
|
continue |
|
|
for field in fields: |
|
|
if alias_norm in field: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
BENCHMARKS: Dict[str, BenchmarkSpec] = { |
|
|
"mmlu": BenchmarkSpec( |
|
|
key="mmlu", |
|
|
label="MMLU", |
|
|
aliases=("mmlu", "massive multitask language understanding"), |
|
|
), |
|
|
"bigcodebench": BenchmarkSpec( |
|
|
key="bigcodebench", |
|
|
label="BigCodeBench", |
|
|
aliases=("bigcodebench", "big code bench"), |
|
|
), |
|
|
"arc_mc": BenchmarkSpec( |
|
|
key="arc_mc", |
|
|
label="ARC MC", |
|
|
aliases=( |
|
|
"arc mc", |
|
|
"arc-challenge", |
|
|
"arc challenge", |
|
|
"arc multiple choice", |
|
|
"arc c", |
|
|
), |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
class EvalsCollector: |
|
|
"""Collects evaluation scores from model-index metadata.""" |
|
|
|
|
|
def __init__(self, token: str | None = None) -> None: |
|
|
self.token = token |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({"User-Agent": USER_AGENT}) |
|
|
if token: |
|
|
self.session.headers.update({"Authorization": f"Bearer {token}"}) |
|
|
self.logs: List[str] = [] |
|
|
self.results: List[Dict[str, Any]] = [] |
|
|
|
|
|
def log(self, message: str) -> None: |
|
|
"""Add a log message.""" |
|
|
print(message) |
|
|
self.logs.append(message) |
|
|
|
|
|
def collect_all(self) -> List[Dict[str, Any]]: |
|
|
"""Collect evaluation scores from trending models.""" |
|
|
self.log("๐ Fetching trending text-generation models...") |
|
|
trending = self._fetch_trending_models() |
|
|
|
|
|
for entry in trending: |
|
|
repo_id = entry.get("modelId") or entry.get("id") |
|
|
if not repo_id: |
|
|
continue |
|
|
scores = self._collect_scores(repo_id) |
|
|
if scores["scores"]: |
|
|
self.results.extend(self._format_scores(repo_id, scores["scores"])) |
|
|
|
|
|
self.log(f"โ
Collected {len(self.results)} evaluation entries") |
|
|
return self.results |
|
|
|
|
|
def _fetch_trending_models(self) -> List[Dict[str, Any]]: |
|
|
params = {"sort": "trendingScore", "limit": TRENDING_FETCH_LIMIT} |
|
|
response = self.session.get( |
|
|
f"{API_BASE}/models", |
|
|
params=params, |
|
|
timeout=30, |
|
|
) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
if not isinstance(data, list): |
|
|
raise ValueError("Unexpected trending response.") |
|
|
filtered = [ |
|
|
model |
|
|
for model in data |
|
|
if (model.get("pipeline_tag") == PIPELINE_FILTER or PIPELINE_FILTER in (model.get("tags") or [])) |
|
|
] |
|
|
if not filtered: |
|
|
self.log("โ ๏ธ No text-generation models in trending feed.") |
|
|
return [] |
|
|
limited = filtered[:TRENDING_LIMIT] |
|
|
self.log(f"๐ Found {len(limited)} trending text-generation models") |
|
|
return limited |
|
|
|
|
|
def _collect_scores(self, repo_id: str) -> Dict[str, Any]: |
|
|
owner = repo_id.split("/")[0] |
|
|
card_meta = self._read_model_card(repo_id) |
|
|
model_index = card_meta.get("model-index") |
|
|
if model_index: |
|
|
self.log(f"โ
{repo_id}: model card metadata found.") |
|
|
scores = self._extract_scores( |
|
|
repo_id=repo_id, |
|
|
model_index=model_index, |
|
|
contributor=owner, |
|
|
source_type="model-card", |
|
|
source_url=f"https://huggingface.co/{repo_id}", |
|
|
revision="main", |
|
|
) |
|
|
if scores: |
|
|
return {"model_id": repo_id, "scores": scores} |
|
|
|
|
|
prs = self._fetch_pull_requests(repo_id) |
|
|
for pr in prs: |
|
|
revision = f"refs/pr/{pr['num']}" |
|
|
pr_meta = self._read_model_card(repo_id, revision=revision) |
|
|
pr_index = pr_meta.get("model-index") |
|
|
if not pr_index: |
|
|
continue |
|
|
author_info = pr.get("author", {}) or {} |
|
|
contributor = author_info.get("name") or author_info.get("fullname") or "unknown-author" |
|
|
discussion_path = f"{repo_id}/discussions/{pr['num']}" |
|
|
source_url = f"https://huggingface.co/{discussion_path}" |
|
|
scores = self._extract_scores( |
|
|
repo_id=repo_id, |
|
|
model_index=pr_index, |
|
|
contributor=contributor, |
|
|
source_type="pull-request", |
|
|
source_url=source_url, |
|
|
revision=revision, |
|
|
) |
|
|
if scores: |
|
|
note = f"๐ {repo_id}: PR #{pr['num']} by {contributor}." |
|
|
self.log(note) |
|
|
return {"model_id": repo_id, "scores": scores} |
|
|
|
|
|
self.log(f"โ ๏ธ {repo_id}: no target benchmarks located.") |
|
|
return {"model_id": repo_id, "scores": {}} |
|
|
|
|
|
def _read_model_card( |
|
|
self, |
|
|
repo_id: str, |
|
|
revision: Optional[str] = None, |
|
|
) -> Dict[str, Any]: |
|
|
try: |
|
|
path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename="README.md", |
|
|
repo_type="model", |
|
|
revision=revision, |
|
|
token=self.token, |
|
|
) |
|
|
except HfHubHTTPError as err: |
|
|
ctx = f"{repo_id} ({revision or 'main'})" |
|
|
self.log(f"๐ซ {ctx}: README download failed ({err}).") |
|
|
return {} |
|
|
text = Path(path).read_text(encoding="utf-8", errors="ignore") |
|
|
return self._parse_front_matter(text) |
|
|
|
|
|
@staticmethod |
|
|
def _parse_front_matter(content: str) -> Dict[str, Any]: |
|
|
content = content.lstrip("\ufeff") |
|
|
if not content.startswith("---"): |
|
|
return {} |
|
|
lines = content.splitlines() |
|
|
end_idx = None |
|
|
for idx, line in enumerate(lines[1:], start=1): |
|
|
if line.strip() == "---": |
|
|
end_idx = idx |
|
|
break |
|
|
if end_idx is None: |
|
|
return {} |
|
|
front_matter = "\n".join(lines[1:end_idx]) |
|
|
try: |
|
|
data = yaml.safe_load(front_matter) or {} |
|
|
return data if isinstance(data, dict) else {} |
|
|
except yaml.YAMLError: |
|
|
return {} |
|
|
|
|
|
def _fetch_pull_requests(self, repo_id: str) -> List[Dict[str, Any]]: |
|
|
url = f"{API_BASE}/models/{repo_id}/discussions" |
|
|
try: |
|
|
response = self.session.get( |
|
|
url, |
|
|
params={"limit": PR_SCAN_LIMIT}, |
|
|
timeout=30, |
|
|
) |
|
|
response.raise_for_status() |
|
|
except requests.RequestException as err: |
|
|
self.log(f"๐ซ {repo_id}: PR list request failed ({err}).") |
|
|
return [] |
|
|
|
|
|
payload = response.json() |
|
|
discussions = payload.get("discussions", []) |
|
|
prs = [disc for disc in discussions if disc.get("isPullRequest")] |
|
|
prs.sort(key=lambda item: item.get("createdAt", ""), reverse=True) |
|
|
if prs: |
|
|
self.log(f"๐ฌ {repo_id}: scanning {len(prs)} pull requests.") |
|
|
return prs |
|
|
|
|
|
def _extract_scores( |
|
|
self, |
|
|
repo_id: str, |
|
|
model_index: Any, |
|
|
contributor: str, |
|
|
source_type: str, |
|
|
source_url: str, |
|
|
revision: str, |
|
|
) -> Dict[str, Dict[str, Any]]: |
|
|
if not isinstance(model_index, list): |
|
|
return {} |
|
|
scores: Dict[str, Dict[str, Any]] = {} |
|
|
for entry in model_index: |
|
|
if not isinstance(entry, dict): |
|
|
continue |
|
|
model_name = entry.get("name") or repo_id.split("/")[-1] |
|
|
for result in entry.get("results", []): |
|
|
dataset_info = result.get("dataset") or {} |
|
|
dataset_name = dataset_info.get("name") |
|
|
dataset_type = dataset_info.get("type") |
|
|
task_info = result.get("task") or {} |
|
|
task_type = task_info.get("type") |
|
|
for metric in result.get("metrics", []): |
|
|
benchmark_key = self._match_benchmark( |
|
|
dataset_name, |
|
|
dataset_type, |
|
|
metric, |
|
|
) |
|
|
if not benchmark_key: |
|
|
continue |
|
|
raw_value = metric.get("value") |
|
|
value = _coerce_score(raw_value) |
|
|
if value is None: |
|
|
continue |
|
|
unit = metric.get("unit") or "" |
|
|
is_pct = isinstance(raw_value, str) and raw_value.strip().endswith("%") |
|
|
if not unit and is_pct: |
|
|
unit = "%" |
|
|
metric_name = metric.get("name") or metric.get("type") or "" |
|
|
payload = { |
|
|
"model": repo_id, |
|
|
"model_name": model_name, |
|
|
"benchmark_key": benchmark_key, |
|
|
"benchmark_label": BENCHMARKS[benchmark_key].label, |
|
|
"value": value, |
|
|
"unit": unit, |
|
|
"dataset": dataset_name or dataset_type or "", |
|
|
"task_type": task_type or "", |
|
|
"metric_name": metric_name, |
|
|
"contributor": contributor, |
|
|
"source_type": source_type, |
|
|
"source_url": source_url, |
|
|
"revision": revision, |
|
|
} |
|
|
existing = scores.get(benchmark_key) |
|
|
if not existing or value > existing["value"]: |
|
|
scores[benchmark_key] = payload |
|
|
return scores |
|
|
|
|
|
def _match_benchmark( |
|
|
self, |
|
|
dataset_name: Optional[str], |
|
|
dataset_type: Optional[str], |
|
|
metric: Dict[str, Any], |
|
|
) -> Optional[str]: |
|
|
fields = [ |
|
|
_normalize(dataset_name), |
|
|
_normalize(dataset_type), |
|
|
_normalize(metric.get("name")), |
|
|
_normalize(metric.get("type")), |
|
|
] |
|
|
fields = [field for field in fields if field] |
|
|
for key, spec in BENCHMARKS.items(): |
|
|
if spec.matches(fields): |
|
|
return key |
|
|
return None |
|
|
|
|
|
def _format_scores(self, model_id: str, scores: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Format scores as flat records for the dataset.""" |
|
|
rows = [] |
|
|
for benchmark_key, payload in scores.items(): |
|
|
rows.append( |
|
|
{ |
|
|
"model_id": model_id, |
|
|
"benchmark": payload["benchmark_label"], |
|
|
"benchmark_key": benchmark_key, |
|
|
"score": round(payload["value"], 2), |
|
|
"source_type": payload["source_type"], |
|
|
"source_url": payload["source_url"], |
|
|
"contributor": payload["contributor"], |
|
|
"collected_at": datetime.now(timezone.utc).isoformat(), |
|
|
} |
|
|
) |
|
|
return rows |
|
|
|
|
|
def get_leaderboard(self) -> List[Dict[str, Any]]: |
|
|
"""Get results sorted by score descending.""" |
|
|
return sorted(self.results, key=lambda x: x["score"], reverse=True) |
|
|
|
|
|
def save_json(self, filepath: str) -> None: |
|
|
"""Save the leaderboard to a JSON file.""" |
|
|
leaderboard = self.get_leaderboard() |
|
|
output = { |
|
|
"generated_at": datetime.now(timezone.utc).isoformat(), |
|
|
"total_entries": len(leaderboard), |
|
|
"benchmarks": list(BENCHMARKS.keys()), |
|
|
"leaderboard": leaderboard, |
|
|
} |
|
|
with open(filepath, "w") as f: |
|
|
json.dump(output, f, indent=2) |
|
|
self.log(f"๐พ Saved leaderboard to {filepath}") |
|
|
|
|
|
def push_to_hub(self, repo_id: str = "hf-skills/evals-leaderboard") -> None: |
|
|
"""Push the leaderboard data to a HF dataset.""" |
|
|
try: |
|
|
from huggingface_hub import HfApi |
|
|
except ImportError: |
|
|
self.log("โ huggingface_hub not installed. Run: pip install huggingface_hub") |
|
|
return |
|
|
|
|
|
api = HfApi(token=self.token) |
|
|
leaderboard = self.get_leaderboard() |
|
|
|
|
|
|
|
|
jsonl_content = "\n".join(json.dumps(row) for row in leaderboard) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"generated_at": datetime.now(timezone.utc).isoformat(), |
|
|
"total_entries": len(leaderboard), |
|
|
"models_with_scores": len(set(r["model_id"] for r in leaderboard)), |
|
|
"benchmarks": list(BENCHMARKS.keys()), |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True) |
|
|
self.log(f"๐ Ensured dataset repo exists: {repo_id}") |
|
|
|
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=jsonl_content.encode(), |
|
|
path_in_repo="data/leaderboard.jsonl", |
|
|
repo_id=repo_id, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Update leaderboard - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')} UTC", |
|
|
) |
|
|
|
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=json.dumps(metadata, indent=2).encode(), |
|
|
path_in_repo="data/metadata.json", |
|
|
repo_id=repo_id, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Update metadata - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')} UTC", |
|
|
) |
|
|
|
|
|
self.log(f"๐ Pushed leaderboard to {repo_id}") |
|
|
except Exception as e: |
|
|
self.log(f"โ Failed to push to hub: {e}") |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
parser = argparse.ArgumentParser(description="Collect evaluation scores from model-index metadata") |
|
|
parser.add_argument( |
|
|
"--push-to-hub", |
|
|
action="store_true", |
|
|
help="Push results to HF dataset", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default="leaderboard.json", |
|
|
help="Output JSON file path", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--repo-id", |
|
|
type=str, |
|
|
default="hf-skills/evals-leaderboard", |
|
|
help="HF dataset repo ID for pushing", |
|
|
) |
|
|
args = parser.parse_args() |
|
|
|
|
|
token = os.environ.get("HF_TOKEN") |
|
|
if not token: |
|
|
print("โ ๏ธ No HF_TOKEN found. Some requests may be rate-limited.") |
|
|
|
|
|
collector = EvalsCollector(token=token) |
|
|
collector.collect_all() |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("๐ EVALUATION LEADERBOARD") |
|
|
print("=" * 60) |
|
|
|
|
|
leaderboard = collector.get_leaderboard() |
|
|
for entry in leaderboard[:20]: |
|
|
print(f"{entry['model_id']:40} | {entry['benchmark']:12} | {entry['score']:6.2f}") |
|
|
|
|
|
if len(leaderboard) > 20: |
|
|
print(f" ... and {len(leaderboard) - 20} more entries") |
|
|
|
|
|
print("=" * 60) |
|
|
print(f"Total entries: {len(leaderboard)}") |
|
|
print(f"Models with scores: {len(set(r['model_id'] for r in leaderboard))}") |
|
|
|
|
|
|
|
|
collector.save_json(args.output) |
|
|
|
|
|
|
|
|
if args.push_to_hub: |
|
|
collector.push_to_hub(args.repo_id) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|