|
|
|
|
|
import os |
|
|
import re |
|
|
import io |
|
|
import sqlite3 |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
from fastapi import FastAPI, HTTPException, status, Header, Depends, File, UploadFile, Form |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import FileResponse |
|
|
from pydantic import BaseModel, EmailStr |
|
|
from passlib.context import CryptContext |
|
|
import jwt |
|
|
|
|
|
|
|
|
from docx import Document as DocxDocument |
|
|
import PyPDF2 |
|
|
|
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
|
import torch |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
|
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
except Exception: |
|
|
SentenceTransformer = None |
|
|
|
|
|
|
|
|
try: |
|
|
import language_tool_python |
|
|
except Exception: |
|
|
language_tool_python = None |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
from gector import GECToR, predict as gector_predict, load_verb_dict |
|
|
except Exception: |
|
|
GECToR = None |
|
|
gector_predict = None |
|
|
load_verb_dict = None |
|
|
|
|
|
|
|
|
from pdf_reports import generate_report |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
JWT_SECRET = os.getenv("JWT_SECRET", "super_secret_key_change_this") |
|
|
JWT_ALGO = os.getenv("JWT_ALGO", "HS256") |
|
|
DB_PATH = os.getenv("DB_PATH", "truewrite.db") |
|
|
CORPUS_DIR = os.getenv("CORPUS_DIR", "corpus") |
|
|
CORPUS_RAW = os.getenv("CORPUS_RAW", "corpus_raw") |
|
|
|
|
|
|
|
|
PLAG_ALPHA = float(os.getenv("PLAG_ALPHA", "0.4")) |
|
|
|
|
|
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") |
|
|
|
|
|
|
|
|
conn = sqlite3.connect(DB_PATH, check_same_thread=False) |
|
|
conn.row_factory = sqlite3.Row |
|
|
cur = conn.cursor() |
|
|
|
|
|
|
|
|
cur.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS users ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
name TEXT NOT NULL, |
|
|
email TEXT NOT NULL UNIQUE, |
|
|
password_hash TEXT NOT NULL, |
|
|
created_at TEXT NOT NULL |
|
|
) |
|
|
""") |
|
|
|
|
|
cur.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS history ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
user_id INTEGER NOT NULL, |
|
|
tool TEXT NOT NULL, |
|
|
input_text TEXT, |
|
|
result_summary TEXT, |
|
|
created_at TEXT NOT NULL, |
|
|
FOREIGN KEY (user_id) REFERENCES users(id) |
|
|
) |
|
|
""") |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
app = FastAPI(title="TrueWrite Scan (Python Backend)") |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
|
|
|
allow_origin_regex=r"https?://.*", |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
class SignupRequest(BaseModel): |
|
|
name: str |
|
|
email: EmailStr |
|
|
password: str |
|
|
|
|
|
|
|
|
class LoginRequest(BaseModel): |
|
|
email: EmailStr |
|
|
password: str |
|
|
|
|
|
|
|
|
class TextRequest(BaseModel): |
|
|
text: str |
|
|
|
|
|
|
|
|
|
|
|
def hash_password(pw: str) -> str: |
|
|
return pwd_context.hash(pw) |
|
|
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool: |
|
|
return pwd_context.verify(plain, hashed) |
|
|
|
|
|
|
|
|
def create_token(user_id: int, email: str) -> str: |
|
|
payload = {"user_id": user_id, "email": email} |
|
|
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO) |
|
|
if isinstance(token, bytes): |
|
|
token = token.decode("utf-8") |
|
|
return token |
|
|
|
|
|
|
|
|
def decode_token(token: str): |
|
|
try: |
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO]) |
|
|
return payload |
|
|
except jwt.PyJWTError: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid token" |
|
|
) |
|
|
|
|
|
|
|
|
def get_current_user(authorization: str = Header(None)): |
|
|
if not authorization or not authorization.startswith("Bearer "): |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Missing token" |
|
|
) |
|
|
token = authorization.split(" ", 1)[1] |
|
|
payload = decode_token(token) |
|
|
user_id = payload.get("user_id") |
|
|
cur.execute("SELECT * FROM users WHERE id = ?", (user_id,)) |
|
|
row = cur.fetchone() |
|
|
if not row: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="User not found" |
|
|
) |
|
|
return {"id": row["id"], "name": row["name"], "email": row["email"]} |
|
|
|
|
|
|
|
|
def now_iso(): |
|
|
return datetime.now(timezone.utc).isoformat() |
|
|
|
|
|
|
|
|
def save_history(user_id: int, tool: str, input_text: str, summary: str): |
|
|
trimmed = (input_text[:500] + "...") if len(input_text) > 500 else input_text |
|
|
cur.execute( |
|
|
"INSERT INTO history (user_id, tool, input_text, result_summary, created_at) VALUES (?, ?, ?, ?, ?)", |
|
|
(user_id, tool, trimmed, summary, now_iso()), |
|
|
) |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
def count_words(text: str) -> int: |
|
|
tokens = text.strip().split() |
|
|
return len(tokens) if text.strip() else 0 |
|
|
|
|
|
|
|
|
def simple_grammar_correct(text: str): |
|
|
"""Old heuristic grammar fixer (kept as fallback).""" |
|
|
corrections = 0 |
|
|
original_words = count_words(text) |
|
|
|
|
|
before = text |
|
|
text = re.sub(r"\s{2,}", " ", text) |
|
|
if text != before: |
|
|
corrections += 1 |
|
|
|
|
|
before = text |
|
|
text = re.sub(r"\bi\b", "I", text) |
|
|
if text != before: |
|
|
corrections += 1 |
|
|
|
|
|
def cap_match(m): |
|
|
return m.group(0).upper() |
|
|
|
|
|
before = text |
|
|
text = re.sub(r"(^\s*\w|[.!?]\s+\w)", cap_match, text) |
|
|
if text != before: |
|
|
corrections += 1 |
|
|
|
|
|
if text.strip() and not re.search(r"[.!?]\s*$", text.strip()): |
|
|
text = text.strip() + "." |
|
|
corrections += 1 |
|
|
|
|
|
return text, corrections, original_words |
|
|
|
|
|
|
|
|
|
|
|
def extract_from_docx_path(path: str) -> str: |
|
|
doc = DocxDocument(path) |
|
|
paragraphs = [p.text for p in doc.paragraphs] |
|
|
return "\n".join(paragraphs) |
|
|
|
|
|
|
|
|
def extract_from_pdf_path(path: str) -> str: |
|
|
with open(path, "rb") as f: |
|
|
reader = PyPDF2.PdfReader(f) |
|
|
texts = [] |
|
|
for pg in range(len(reader.pages)): |
|
|
try: |
|
|
texts.append(reader.pages[pg].extract_text() or "") |
|
|
except Exception: |
|
|
texts.append("") |
|
|
return "\n".join(texts) |
|
|
|
|
|
|
|
|
def build_corpus_from_raw(raw_dir: str = CORPUS_RAW, out_dir: str = CORPUS_DIR): |
|
|
""" |
|
|
Convert any .pdf / .docx / .txt files from corpus_raw/ into .txt files in corpus/. |
|
|
This mirrors your build_corpus.py logic but is called automatically at startup. |
|
|
""" |
|
|
os.makedirs(raw_dir, exist_ok=True) |
|
|
os.makedirs(out_dir, exist_ok=True) |
|
|
|
|
|
for fname in os.listdir(raw_dir): |
|
|
inpath = os.path.join(raw_dir, fname) |
|
|
if not os.path.isfile(inpath): |
|
|
continue |
|
|
outname = os.path.splitext(fname)[0] + ".txt" |
|
|
outpath = os.path.join(out_dir, outname) |
|
|
try: |
|
|
ext = fname.lower() |
|
|
if ext.endswith(".docx"): |
|
|
text = extract_from_docx_path(inpath) |
|
|
elif ext.endswith(".pdf"): |
|
|
text = extract_from_pdf_path(inpath) |
|
|
elif ext.endswith(".txt"): |
|
|
with open(inpath, "r", encoding="utf-8", errors="ignore") as f: |
|
|
text = f.read() |
|
|
else: |
|
|
print("[CorpusRaw] Skipping unsupported:", fname) |
|
|
continue |
|
|
|
|
|
text = text.strip() |
|
|
with open(outpath, "w", encoding="utf-8") as fo: |
|
|
fo.write(text) |
|
|
print("[CorpusRaw] Wrote:", outpath) |
|
|
except Exception as e: |
|
|
print("[CorpusRaw] Failed", fname, "->", e) |
|
|
|
|
|
|
|
|
|
|
|
vectorizer = None |
|
|
corpus_tfidf = None |
|
|
corpus_titles = [] |
|
|
corpus_texts = [] |
|
|
|
|
|
|
|
|
def load_corpus(corpus_dir=CORPUS_DIR): |
|
|
""" |
|
|
Load .txt corpus files from CORPUS_DIR, build TF-IDF index. |
|
|
Semantic embeddings are built separately in load_embeddings(). |
|
|
""" |
|
|
global vectorizer, corpus_tfidf, corpus_titles, corpus_texts |
|
|
corpus_titles = [] |
|
|
corpus_texts = [] |
|
|
if not os.path.isdir(corpus_dir): |
|
|
os.makedirs(corpus_dir, exist_ok=True) |
|
|
print("[Corpus] Created empty corpus directory:", corpus_dir) |
|
|
vectorizer = None |
|
|
corpus_tfidf = None |
|
|
return |
|
|
|
|
|
for fname in os.listdir(corpus_dir): |
|
|
if fname.lower().endswith(".txt"): |
|
|
path = os.path.join(corpus_dir, fname) |
|
|
try: |
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f: |
|
|
txt = f.read() |
|
|
corpus_titles.append(fname) |
|
|
corpus_texts.append(txt) |
|
|
except Exception as e: |
|
|
print(f"[Corpus] Failed to read {path}: {e}") |
|
|
|
|
|
if corpus_texts: |
|
|
try: |
|
|
vectorizer = TfidfVectorizer( |
|
|
ngram_range=(1, 3), |
|
|
stop_words="english", |
|
|
max_features=50000 |
|
|
) |
|
|
corpus_tfidf = vectorizer.fit_transform(corpus_texts) |
|
|
print(f"[Corpus] Loaded {len(corpus_texts)} documents into TF-IDF index") |
|
|
except Exception as e: |
|
|
print("[Corpus] TF-IDF build failed:", e) |
|
|
vectorizer = None |
|
|
corpus_tfidf = None |
|
|
else: |
|
|
vectorizer = None |
|
|
corpus_tfidf = None |
|
|
print("[Corpus] No .txt documents found in", corpus_dir) |
|
|
|
|
|
|
|
|
|
|
|
emb_model = None |
|
|
corpus_emb = None |
|
|
EMB_MODEL_NAME = os.getenv("PLAG_EMB_MODEL", "sentence-transformers/all-MiniLM-L6-v2") |
|
|
|
|
|
|
|
|
def load_embeddings(): |
|
|
""" |
|
|
Build semantic embedding index for plagiarism using sentence-transformers. |
|
|
""" |
|
|
global emb_model, corpus_emb |
|
|
if SentenceTransformer is None: |
|
|
print("[Embeddings] sentence-transformers not installed; skipping semantic index.") |
|
|
emb_model = None |
|
|
corpus_emb = None |
|
|
return |
|
|
|
|
|
if not corpus_texts: |
|
|
print("[Embeddings] No corpus texts available; semantic index not built.") |
|
|
emb_model = None |
|
|
corpus_emb = None |
|
|
return |
|
|
|
|
|
try: |
|
|
emb_model = SentenceTransformer(EMB_MODEL_NAME) |
|
|
corpus_emb = emb_model.encode( |
|
|
corpus_texts, |
|
|
convert_to_numpy=True, |
|
|
show_progress_bar=False, |
|
|
normalize_embeddings=True, |
|
|
) |
|
|
print(f"[Embeddings] Loaded '{EMB_MODEL_NAME}' and encoded {len(corpus_texts)} corpus docs.") |
|
|
except Exception as e: |
|
|
emb_model = None |
|
|
corpus_emb = None |
|
|
print("[Embeddings] Failed to load or encode corpus:", e) |
|
|
|
|
|
|
|
|
|
|
|
build_corpus_from_raw() |
|
|
load_corpus() |
|
|
load_embeddings() |
|
|
|
|
|
|
|
|
AI_DETECTOR_MODEL = "openai-community/roberta-base-openai-detector" |
|
|
tokenizer = None |
|
|
model = None |
|
|
device = None |
|
|
|
|
|
try: |
|
|
tokenizer = AutoTokenizer.from_pretrained(AI_DETECTOR_MODEL) |
|
|
model = AutoModelForSequenceClassification.from_pretrained(AI_DETECTOR_MODEL) |
|
|
model.eval() |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model.to(device) |
|
|
print(f"[AI Detector] Loaded {AI_DETECTOR_MODEL} on {device}") |
|
|
except Exception as e: |
|
|
tokenizer = None |
|
|
model = None |
|
|
device = None |
|
|
print("[AI Detector] Failed to load HF model β using heuristic fallback. Error:", e) |
|
|
|
|
|
|
|
|
GEC_MODEL = None |
|
|
GEC_TOKENIZER = None |
|
|
GEC_ENCODE = None |
|
|
GEC_DECODE = None |
|
|
GEC_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
if GECToR is not None and gector_predict is not None and load_verb_dict is not None: |
|
|
try: |
|
|
print("[GECToR] Initializing model... (This may take a bit on first run)") |
|
|
GEC_MODEL_ID = os.getenv("GEC_MODEL_ID", "gotutiyan/gector-roberta-base-5k") |
|
|
VERB_DICT_PATH = os.getenv("GEC_VERB_DICT", "/app/data/verb-form-vocab.txt") |
|
|
|
|
|
GEC_MODEL = GECToR.from_pretrained(GEC_MODEL_ID).to(GEC_DEVICE) |
|
|
GEC_TOKENIZER = AutoTokenizer.from_pretrained(GEC_MODEL_ID) |
|
|
GEC_ENCODE, GEC_DECODE = load_verb_dict(VERB_DICT_PATH) |
|
|
|
|
|
print(f"[GECToR] Model & verb dict loaded: {GEC_MODEL_ID}") |
|
|
except Exception as e: |
|
|
print(f"[GECToR] Failed to load. Error: {e}") |
|
|
GEC_MODEL = None |
|
|
GEC_TOKENIZER = None |
|
|
GEC_ENCODE = None |
|
|
GEC_DECODE = None |
|
|
else: |
|
|
print("[GECToR] Library not available; skipping neural GEC.") |
|
|
|
|
|
|
|
|
def gector_correct(text: str): |
|
|
""" |
|
|
Run neural grammatical error correction using GECToR (gotutiyan implementation). |
|
|
""" |
|
|
if GEC_MODEL is None or GEC_TOKENIZER is None or GEC_ENCODE is None or GEC_DECODE is None: |
|
|
print("[GECToR] Model not loaded, skipping.") |
|
|
return text, 0, len(text.split()) if text.strip() else 0 |
|
|
|
|
|
parts = text.strip().split() |
|
|
|
|
|
if len(parts) > 1000: |
|
|
text_proc = " ".join(parts[:1000]) |
|
|
else: |
|
|
text_proc = text.strip() |
|
|
|
|
|
if not text_proc: |
|
|
return text_proc, 0, 0 |
|
|
|
|
|
srcs = [text_proc] |
|
|
|
|
|
try: |
|
|
corrected_list = gector_predict( |
|
|
GEC_MODEL, |
|
|
GEC_TOKENIZER, |
|
|
srcs, |
|
|
GEC_ENCODE, |
|
|
GEC_DECODE, |
|
|
keep_confidence=0.0, |
|
|
min_error_prob=0.0, |
|
|
n_iteration=5, |
|
|
batch_size=2, |
|
|
) |
|
|
corrected_text = corrected_list[0] |
|
|
|
|
|
orig_tokens = text_proc.split() |
|
|
corr_tokens = corrected_text.split() |
|
|
corrections = sum(1 for a, b in zip(orig_tokens, corr_tokens) if a != b) |
|
|
original_words = len(orig_tokens) |
|
|
|
|
|
return corrected_text, corrections, original_words |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[GECToR] Prediction error: {e}") |
|
|
return text_proc, 0, len(text_proc.split()) |
|
|
|
|
|
|
|
|
|
|
|
MAX_FILE_SIZE = 15 * 1024 * 1024 |
|
|
|
|
|
|
|
|
def extract_text_from_upload(upload: UploadFile) -> str: |
|
|
filename = (upload.filename or "").lower() |
|
|
content_type = (upload.content_type or "").lower() |
|
|
data = upload.file.read() |
|
|
try: |
|
|
upload.file.seek(0) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
if len(data) > MAX_FILE_SIZE: |
|
|
raise HTTPException(status_code=413, detail="File too large (max 15MB)") |
|
|
|
|
|
|
|
|
if filename.endswith(".txt") or content_type == "text/plain": |
|
|
try: |
|
|
try: |
|
|
return data.decode("utf-8") |
|
|
except UnicodeDecodeError: |
|
|
return data.decode("latin-1") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=400, detail=f"Failed to decode text file: {e}") |
|
|
|
|
|
|
|
|
if filename.endswith(".docx") or "wordprocessingml" in content_type: |
|
|
|
|
|
if not data.startswith(b"PK"): |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail="Uploaded file is not a valid .docx package (it might be an old .doc file or a corrupted document). " |
|
|
"Please open it in Word/Google Docs and re-save as .docx or export as PDF, then upload again." |
|
|
) |
|
|
try: |
|
|
f = io.BytesIO(data) |
|
|
doc = DocxDocument(f) |
|
|
paragraphs = [p.text for p in doc.paragraphs] |
|
|
text = "\n".join(paragraphs).strip() |
|
|
if not text: |
|
|
raise ValueError("DOCX contained no readable text.") |
|
|
return text |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Failed to parse docx file: {e}. Try opening it in Word/Google Docs and exporting again as .docx or PDF." |
|
|
) |
|
|
|
|
|
|
|
|
if filename.endswith(".pdf") or "pdf" in content_type: |
|
|
try: |
|
|
f = io.BytesIO(data) |
|
|
reader = PyPDF2.PdfReader(f) |
|
|
texts = [] |
|
|
for pg in range(len(reader.pages)): |
|
|
try: |
|
|
txt = reader.pages[pg].extract_text() or "" |
|
|
except Exception: |
|
|
txt = "" |
|
|
texts.append(txt) |
|
|
return "\n".join(texts) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=400, detail=f"Failed to parse PDF file: {e}") |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=415, |
|
|
detail="Unsupported file type. Use .txt, .pdf, or .docx", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
lt_tool = None |
|
|
if language_tool_python is not None: |
|
|
try: |
|
|
lt_tool = language_tool_python.LanguageTool("en-US") |
|
|
print("[LanguageTool] Loaded (local Java-backed checker)") |
|
|
except Exception as e: |
|
|
lt_tool = None |
|
|
print("[LanguageTool] Could not start local LanguageTool β falling back. Error:", e) |
|
|
else: |
|
|
print("[LanguageTool] library not installed; falling back to heuristics.") |
|
|
|
|
|
|
|
|
def grammar_with_languagetool(text: str): |
|
|
parts = text.strip().split() |
|
|
if len(parts) > 1000: |
|
|
text_proc = " ".join(parts[:1000]) |
|
|
else: |
|
|
text_proc = text.strip() |
|
|
|
|
|
matches = lt_tool.check(text_proc) |
|
|
corrected = language_tool_python.utils.correct(text_proc, matches) |
|
|
corrections = len(matches) |
|
|
return corrected, corrections, len(text_proc.split()) |
|
|
|
|
|
|
|
|
|
|
|
def _clean_for_jaccard(t: str): |
|
|
t = t.lower() |
|
|
t = re.sub(r"[^a-z0-9\s]", " ", t) |
|
|
return [w for w in t.split() if w] |
|
|
|
|
|
|
|
|
def _jaccard_similarity(a, b): |
|
|
sa = set(a) |
|
|
sb = set(b) |
|
|
if not sa or not sb: |
|
|
return 0.0 |
|
|
return len(sa & sb) / len(sa | sb) |
|
|
|
|
|
|
|
|
def demo_plagiarism_fallback(text: str): |
|
|
""" |
|
|
Simple Jaccard-based fallback using a tiny built-in sample set. |
|
|
Used when no TF-IDF / semantic corpus is available. |
|
|
""" |
|
|
SAMPLE_DOCS = [ |
|
|
{"title": "AI for Social Good", |
|
|
"text": "Artificial intelligence is transforming multiple industries by automating routine tasks and enabling data driven decision making for social impact and efficiency."}, |
|
|
{"title": "IoT in Smart Cities", |
|
|
"text": "The Internet of Things connects sensors, devices, and cloud platforms to enable real time monitoring and control in smart cities including lighting, traffic, and waste management."}, |
|
|
{"title": "Climate & Renewable Energy", |
|
|
"text": "Climate change is a critical global challenge that demands renewable energy, efficient resource management, and international cooperation to ensure a sustainable future."}, |
|
|
] |
|
|
|
|
|
input_words = _clean_for_jaccard(text) |
|
|
best_score = 0.0 |
|
|
matches = [] |
|
|
for doc in SAMPLE_DOCS: |
|
|
doc_words = _clean_for_jaccard(doc["text"]) |
|
|
score = _jaccard_similarity(input_words, doc_words) |
|
|
matches.append({"title": doc["title"], "score": round(score * 100, 2)}) |
|
|
if score > best_score: |
|
|
best_score = score |
|
|
|
|
|
matches.sort(key=lambda x: x["score"], reverse=True) |
|
|
plagiarism_percent = round(best_score * 100, 2) |
|
|
summary = f"Plagiarism estimate (demo Jaccard): {plagiarism_percent}%" |
|
|
return {"plagiarism_percent": plagiarism_percent, "matches": matches[:5], "summary": summary} |
|
|
|
|
|
|
|
|
def corpus_plagiarism_combined(text: str): |
|
|
""" |
|
|
Combined plagiarism score using: |
|
|
- TF-IDF cosine similarity |
|
|
- Semantic embedding cosine similarity (SentenceTransformers) |
|
|
|
|
|
Returns dict matching API schema: |
|
|
{ plagiarism_percent, matches, summary } |
|
|
""" |
|
|
if not corpus_texts: |
|
|
raise ValueError("No corpus texts loaded") |
|
|
|
|
|
sims_tfidf = None |
|
|
sims_emb = None |
|
|
|
|
|
words = text.split() |
|
|
if len(words) > 3000: |
|
|
text_proc = " ".join(words[:3000]) |
|
|
else: |
|
|
text_proc = text |
|
|
|
|
|
|
|
|
if vectorizer is not None and corpus_tfidf is not None: |
|
|
q = vectorizer.transform([text_proc]) |
|
|
sims_tfidf = cosine_similarity(q, corpus_tfidf)[0] |
|
|
|
|
|
|
|
|
if emb_model is not None and corpus_emb is not None: |
|
|
q_emb = emb_model.encode( |
|
|
[text_proc], |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True, |
|
|
show_progress_bar=False, |
|
|
)[0] |
|
|
sims_emb = corpus_emb @ q_emb |
|
|
|
|
|
if sims_tfidf is None and sims_emb is None: |
|
|
raise ValueError("No plagiarism backends (TF-IDF / embeddings) are available") |
|
|
|
|
|
n_docs = len(corpus_texts) |
|
|
combined_rows = [] |
|
|
alpha = PLAG_ALPHA |
|
|
|
|
|
for i in range(n_docs): |
|
|
tf = float(sims_tfidf[i]) if sims_tfidf is not None else None |
|
|
se = float(sims_emb[i]) if sims_emb is not None else None |
|
|
if tf is None and se is None: |
|
|
continue |
|
|
|
|
|
if tf is not None and se is not None: |
|
|
score = alpha * tf + (1.0 - alpha) * se |
|
|
elif tf is not None: |
|
|
score = tf |
|
|
else: |
|
|
score = se |
|
|
|
|
|
combined_rows.append({ |
|
|
"index": i, |
|
|
"combined": score, |
|
|
"tfidf": tf, |
|
|
"semantic": se, |
|
|
}) |
|
|
|
|
|
if not combined_rows: |
|
|
raise ValueError("No scores computed for corpus documents") |
|
|
|
|
|
combined_rows.sort(key=lambda x: x["combined"], reverse=True) |
|
|
top = combined_rows[:10] |
|
|
|
|
|
best = top[0]["combined"] |
|
|
plagiarism_percent = round(best * 100, 2) |
|
|
|
|
|
matches = [] |
|
|
for row in top: |
|
|
matches.append({ |
|
|
"title": corpus_titles[row["index"]], |
|
|
"score": round(row["combined"] * 100, 2), |
|
|
"tfidf_score": round(row["tfidf"] * 100, 2) if row["tfidf"] is not None else None, |
|
|
"semantic_score": round(row["semantic"] * 100, 2) if row["semantic"] is not None else None, |
|
|
}) |
|
|
|
|
|
components = [] |
|
|
if sims_tfidf is not None: |
|
|
components.append("TF-IDF") |
|
|
if sims_emb is not None: |
|
|
components.append("semantic embeddings") |
|
|
comp_str = " + ".join(components) |
|
|
|
|
|
summary = f"Plagiarism estimate (combined {comp_str}): {plagiarism_percent}%" |
|
|
return {"plagiarism_percent": plagiarism_percent, "matches": matches, "summary": summary} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/signup") |
|
|
def signup(req: SignupRequest): |
|
|
cur.execute("SELECT id FROM users WHERE email = ?", (req.email,)) |
|
|
if cur.fetchone(): |
|
|
raise HTTPException(status_code=400, detail="Email already registered") |
|
|
|
|
|
pw_hash = hash_password(req.password) |
|
|
created_at = now_iso() |
|
|
cur.execute( |
|
|
"INSERT INTO users (name, email, password_hash, created_at) VALUES (?, ?, ?, ?)", |
|
|
(req.name, req.email, pw_hash, created_at), |
|
|
) |
|
|
conn.commit() |
|
|
user_id = cur.lastrowid |
|
|
token = create_token(user_id, req.email) |
|
|
|
|
|
return { |
|
|
"message": "Signup successful", |
|
|
"token": token, |
|
|
"name": req.name, |
|
|
"email": req.email, |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/login") |
|
|
def login(req: LoginRequest): |
|
|
cur.execute("SELECT * FROM users WHERE email = ?", (req.email,)) |
|
|
row = cur.fetchone() |
|
|
if not row or not verify_password(req.password, row["password_hash"]): |
|
|
raise HTTPException(status_code=401, detail="Invalid email or password") |
|
|
|
|
|
token = create_token(row["id"], row["email"]) |
|
|
return { |
|
|
"message": "Login successful", |
|
|
"token": token, |
|
|
"name": row["name"], |
|
|
"email": row["email"], |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/grammar-check") |
|
|
def api_grammar_check(req: TextRequest, user=Depends(get_current_user)): |
|
|
text = req.text or "" |
|
|
if not text.strip(): |
|
|
raise HTTPException(status_code=400, detail="Text is required") |
|
|
|
|
|
|
|
|
if GEC_MODEL is not None: |
|
|
corrected, corrections, original_words = gector_correct(text) |
|
|
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
|
|
elif lt_tool is not None: |
|
|
corrected, corrections, original_words = grammar_with_languagetool(text) |
|
|
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
|
|
else: |
|
|
corrected, corrections, original_words = simple_grammar_correct(text) |
|
|
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
|
|
|
save_history(user["id"], "grammar", text, summary) |
|
|
|
|
|
return { |
|
|
"original_words": original_words, |
|
|
"corrections": corrections, |
|
|
"corrected_text": corrected, |
|
|
"summary": summary, |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/grammar-check-file") |
|
|
def api_grammar_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
|
|
|
if GEC_MODEL is not None: |
|
|
corrected, corrections, original_words = gector_correct(text) |
|
|
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
|
|
elif lt_tool is not None: |
|
|
corrected, corrections, original_words = grammar_with_languagetool(text) |
|
|
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
|
|
else: |
|
|
parts = text.strip().split() |
|
|
if len(parts) > 1000: |
|
|
text = " ".join(parts[:1000]) |
|
|
corrected, corrections, original_words = simple_grammar_correct(text) |
|
|
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
|
|
|
save_history(user["id"], "grammar", text, summary) |
|
|
|
|
|
return { |
|
|
"original_words": original_words, |
|
|
"corrections": corrections, |
|
|
"corrected_text": corrected, |
|
|
"summary": summary, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/plagiarism-check") |
|
|
def api_plagiarism_check(req: TextRequest, user=Depends(get_current_user)): |
|
|
text = req.text or "" |
|
|
if not text.strip(): |
|
|
raise HTTPException(status_code=400, detail="Text is required") |
|
|
|
|
|
|
|
|
try: |
|
|
result = corpus_plagiarism_combined(text) |
|
|
save_history(user["id"], "plagiarism", text, result["summary"]) |
|
|
return result |
|
|
except Exception as e: |
|
|
print("[Plagiarism] Combined corpus engine failed, falling back to demo:", e) |
|
|
|
|
|
|
|
|
result = demo_plagiarism_fallback(text) |
|
|
save_history(user["id"], "plagiarism", text, result["summary"]) |
|
|
return result |
|
|
|
|
|
|
|
|
@app.post("/api/plagiarism-check-file") |
|
|
def api_plagiarism_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
|
|
|
try: |
|
|
result = corpus_plagiarism_combined(text) |
|
|
save_history(user["id"], "plagiarism", text, result["summary"]) |
|
|
return result |
|
|
except Exception as e: |
|
|
print("[Plagiarism-file] Combined corpus engine failed, falling back to demo:", e) |
|
|
|
|
|
|
|
|
result = demo_plagiarism_fallback(text) |
|
|
save_history(user["id"], "plagiarism", text, result["summary"]) |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def heuristic_ai_score(text: str): |
|
|
words = re.sub(r"[^a-z0-9\s]", " ", text.lower()).split() |
|
|
word_count = len(words) |
|
|
unique_ratio = len(set(words)) / (word_count or 1) |
|
|
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
|
|
avg_sentence_length = word_count / (len(sentences) or 1) |
|
|
|
|
|
ai_score = 0 |
|
|
if unique_ratio < 0.45: |
|
|
ai_score += 40 |
|
|
elif unique_ratio < 0.6: |
|
|
ai_score += 20 |
|
|
|
|
|
if avg_sentence_length > 25: |
|
|
ai_score += 40 |
|
|
elif avg_sentence_length > 18: |
|
|
ai_score += 25 |
|
|
|
|
|
if word_count > 400: |
|
|
ai_score += 10 |
|
|
|
|
|
ai_score = min(100, round(ai_score)) |
|
|
human_score = 100 - ai_score |
|
|
return ai_score, human_score, word_count, avg_sentence_length, unique_ratio |
|
|
|
|
|
|
|
|
@app.post("/api/ai-check") |
|
|
def api_ai_check(req: TextRequest, user=Depends(get_current_user)): |
|
|
text = (req.text or "").strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Text is required") |
|
|
|
|
|
if model is not None and tokenizer is not None: |
|
|
try: |
|
|
max_len = getattr(tokenizer, "model_max_length", 512) |
|
|
if max_len is None or max_len > 1024: |
|
|
max_len = 512 |
|
|
|
|
|
words = text.split() |
|
|
chunk_size = min(400, max_len - 10) |
|
|
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
|
|
probs = [] |
|
|
for chunk in chunks: |
|
|
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len) |
|
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
p = torch.softmax(logits, dim=1).cpu().numpy()[0] |
|
|
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0]) |
|
|
probs.append(ai_prob) |
|
|
avg_ai_prob = float(np.mean(probs)) if probs else 0.0 |
|
|
ai_percent = round(avg_ai_prob * 100, 2) |
|
|
human_percent = round(100 - ai_percent, 2) |
|
|
words_count = len(words) |
|
|
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
|
|
avg_sentence_len = round(words_count / (len(sentences) or 1), 2) |
|
|
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%" |
|
|
save_history(user["id"], "ai", text, summary) |
|
|
return { |
|
|
"ai_percent": ai_percent, |
|
|
"human_percent": human_percent, |
|
|
"word_count": words_count, |
|
|
"avg_sentence_length": avg_sentence_len, |
|
|
"summary": summary, |
|
|
} |
|
|
except Exception as e: |
|
|
print("[AI-check] model inference failed:", e) |
|
|
|
|
|
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
|
|
summary = f"HEURISTIC fallback β AI probability: {ai_percent}%" |
|
|
save_history(user["id"], "ai", text, summary) |
|
|
return { |
|
|
"ai_percent": ai_percent, |
|
|
"human_percent": human_percent, |
|
|
"word_count": wc, |
|
|
"avg_sentence_length": avg_len, |
|
|
"unique_ratio": round(uniq, 3), |
|
|
"summary": summary, |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/ai-check-file") |
|
|
def api_ai_check_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
return api_ai_check.__wrapped__(TextRequest(text=text), user) |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/api/history") |
|
|
def api_history(user=Depends(get_current_user)): |
|
|
cur.execute( |
|
|
"SELECT id, tool, input_text, result_summary, created_at " |
|
|
"FROM history WHERE user_id = ? " |
|
|
"ORDER BY created_at DESC LIMIT 50", |
|
|
(user["id"],), |
|
|
) |
|
|
rows = cur.fetchall() |
|
|
items = [] |
|
|
for r in rows: |
|
|
items.append( |
|
|
{ |
|
|
"id": r["id"], |
|
|
"tool": r["tool"], |
|
|
"input_text": r["input_text"], |
|
|
"summary": r["result_summary"], |
|
|
"created_at": r["created_at"], |
|
|
} |
|
|
) |
|
|
return {"items": items} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/report/grammar") |
|
|
def report_grammar_get(text: str = "", user=Depends(get_current_user)): |
|
|
""" |
|
|
Generate Grammar Report PDF from query param text. |
|
|
If you prefer file upload, use POST /report/grammar-file |
|
|
""" |
|
|
if not text.strip(): |
|
|
raise HTTPException(status_code=400, detail="Text is required for report") |
|
|
|
|
|
if GEC_MODEL is not None: |
|
|
corrected, corrections, original_words = gector_correct(text) |
|
|
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
|
|
elif lt_tool is not None: |
|
|
corrected, corrections, original_words = grammar_with_languagetool(text) |
|
|
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
|
|
else: |
|
|
corrected, corrections, original_words = simple_grammar_correct(text) |
|
|
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
|
|
|
save_history(user["id"], "grammar", text, summary) |
|
|
|
|
|
|
|
|
tiles = [ |
|
|
{'value': str(corrections), 'label': 'Errors'}, |
|
|
{'value': '-', 'label': 'Warnings'}, |
|
|
{'value': '-', 'label': 'Suggestions'}, |
|
|
{'value': 'β', 'label': 'Readability'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(original_words), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Read Time': f"{max(1, original_words//200)} minute(s)" |
|
|
} |
|
|
sections = [ |
|
|
{'heading': 'Summary', 'paragraphs': [ |
|
|
"This Grammar Report lists detected grammar issues and suggestions.", |
|
|
{'text': f"Corrections suggested: {corrections}", 'highlight': 'yellow' if corrections > 0 else None}, |
|
|
{'text': corrected, 'highlight': None} |
|
|
]}, |
|
|
{'heading': 'Document', 'paragraphs': [text]} |
|
|
] |
|
|
matched_sources = [] |
|
|
footer = "Grammar suggestions are automated. Review before applying changes." |
|
|
|
|
|
pdf_path = generate_report("grammar", out_dir="/tmp", |
|
|
title_text="Grammar Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_GrammarReport.pdf") |
|
|
|
|
|
|
|
|
@app.post("/report/grammar-file") |
|
|
def report_grammar_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
|
|
|
if GEC_MODEL is not None: |
|
|
corrected, corrections, original_words = gector_correct(text) |
|
|
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}" |
|
|
elif lt_tool is not None: |
|
|
corrected, corrections, original_words = grammar_with_languagetool(text) |
|
|
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}" |
|
|
else: |
|
|
corrected, corrections, original_words = simple_grammar_correct(text) |
|
|
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}" |
|
|
|
|
|
save_history(user["id"], "grammar", text, summary) |
|
|
|
|
|
tiles = [ |
|
|
{'value': str(corrections), 'label': 'Errors'}, |
|
|
{'value': '-', 'label': 'Warnings'}, |
|
|
{'value': '-', 'label': 'Suggestions'}, |
|
|
{'value': 'β', 'label': 'Readability'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(original_words), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Read Time': f"{max(1, original_words//200)} minute(s)" |
|
|
} |
|
|
sections = [ |
|
|
{'heading': 'Summary', 'paragraphs': [ |
|
|
"This Grammar Report lists detected grammar issues and suggestions.", |
|
|
{'text': f"Corrections suggested: {corrections}", 'highlight': 'yellow' if corrections > 0 else None}, |
|
|
{'text': corrected, 'highlight': None} |
|
|
]}, |
|
|
{'heading': 'Document', 'paragraphs': [text]} |
|
|
] |
|
|
matched_sources = [] |
|
|
footer = "Grammar suggestions are automated. Review before applying changes." |
|
|
|
|
|
pdf_path = generate_report("grammar", out_dir="/tmp", |
|
|
title_text="Grammar Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_GrammarReport.pdf") |
|
|
|
|
|
|
|
|
@app.get("/report/plagiarism") |
|
|
def report_plagiarism_get(text: str = "", user=Depends(get_current_user)): |
|
|
""" |
|
|
Generate Plagiarism Report PDF from query param text. |
|
|
If you prefer file upload, use POST /report/plagiarism-file |
|
|
""" |
|
|
if not text.strip(): |
|
|
raise HTTPException(status_code=400, detail="Text is required for report") |
|
|
|
|
|
|
|
|
try: |
|
|
result = corpus_plagiarism_combined(text) |
|
|
except Exception: |
|
|
result = demo_plagiarism_fallback(text) |
|
|
|
|
|
save_history(user["id"], "plagiarism", text, result.get("summary", "")) |
|
|
|
|
|
|
|
|
plag_percent = f"{result.get('plagiarism_percent', 0)}%" |
|
|
top_matches = result.get("matches", [])[:5] |
|
|
tiles = [ |
|
|
{'value': plag_percent, 'label': 'Plagiarism'}, |
|
|
{'value': f"{top_matches[0]['score']}%" if top_matches else '0%', 'label': 'Top Match'}, |
|
|
{'value': '-', 'label': 'Partial Match'}, |
|
|
{'value': f"{100 - float(result.get('plagiarism_percent', 0))}%", 'label': 'Unique'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(count_words(text)), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Read Time': f"{max(1, count_words(text)//200)} minute(s)" |
|
|
} |
|
|
|
|
|
|
|
|
sections = [ |
|
|
{'heading': 'Summary', 'paragraphs': [ |
|
|
result.get("summary", "Plagiarism analysis completed."), |
|
|
"Top matches are listed below." |
|
|
]}, |
|
|
{'heading': 'Document', 'paragraphs': [text]} |
|
|
] |
|
|
|
|
|
matched_sources = [] |
|
|
for m in top_matches: |
|
|
matched_sources.append({ |
|
|
'title': m.get('title') or m.get('source', 'Source'), |
|
|
'url': m.get('url') or '', |
|
|
'similarity': f"{m.get('score', 0)}%" |
|
|
}) |
|
|
|
|
|
footer = "Plagiarism detection results are estimates. Review sources for exact matches." |
|
|
|
|
|
pdf_path = generate_report("plagiarism", out_dir="/tmp", |
|
|
title_text="Plagiarism Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_PlagiarismReport.pdf") |
|
|
|
|
|
|
|
|
@app.post("/report/plagiarism-file") |
|
|
def report_plagiarism_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
|
|
|
try: |
|
|
result = corpus_plagiarism_combined(text) |
|
|
except Exception: |
|
|
result = demo_plagiarism_fallback(text) |
|
|
|
|
|
save_history(user["id"], "plagiarism", text, result.get("summary", "")) |
|
|
|
|
|
plag_percent = f"{result.get('plagiarism_percent', 0)}%" |
|
|
top_matches = result.get("matches", [])[:5] |
|
|
tiles = [ |
|
|
{'value': plag_percent, 'label': 'Plagiarism'}, |
|
|
{'value': f"{top_matches[0]['score']}%" if top_matches else '0%', 'label': 'Top Match'}, |
|
|
{'value': '-', 'label': 'Partial Match'}, |
|
|
{'value': f"{100 - float(result.get('plagiarism_percent', 0))}%", 'label': 'Unique'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(count_words(text)), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Read Time': f"{max(1, count_words(text)//200)} minute(s)" |
|
|
} |
|
|
|
|
|
sections = [ |
|
|
{'heading': 'Summary', 'paragraphs': [ |
|
|
result.get("summary", "Plagiarism analysis completed."), |
|
|
"Top matches are listed below." |
|
|
]}, |
|
|
{'heading': 'Document', 'paragraphs': [text]} |
|
|
] |
|
|
|
|
|
matched_sources = [] |
|
|
for m in top_matches: |
|
|
matched_sources.append({ |
|
|
'title': m.get('title') or m.get('source', 'Source'), |
|
|
'url': m.get('url') or '', |
|
|
'similarity': f"{m.get('score', 0)}%" |
|
|
}) |
|
|
|
|
|
footer = "Plagiarism detection results are estimates. Review sources for exact matches." |
|
|
|
|
|
pdf_path = generate_report("plagiarism", out_dir="/tmp", |
|
|
title_text="Plagiarism Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_PlagiarismReport.pdf") |
|
|
|
|
|
|
|
|
@app.get("/report/ai") |
|
|
def report_ai_get(text: str = "", user=Depends(get_current_user)): |
|
|
""" |
|
|
Generate AI Content Report PDF from query param text. |
|
|
If you prefer file upload, use POST /report/ai-file |
|
|
""" |
|
|
if not text.strip(): |
|
|
raise HTTPException(status_code=400, detail="Text is required for report") |
|
|
|
|
|
|
|
|
if model is not None and tokenizer is not None: |
|
|
try: |
|
|
max_len = getattr(tokenizer, "model_max_length", 512) |
|
|
if max_len is None or max_len > 1024: |
|
|
max_len = 512 |
|
|
|
|
|
words = text.split() |
|
|
chunk_size = min(400, max_len - 10) |
|
|
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
|
|
probs = [] |
|
|
for chunk in chunks: |
|
|
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len) |
|
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
p = torch.softmax(logits, dim=1).cpu().numpy()[0] |
|
|
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0]) |
|
|
probs.append(ai_prob) |
|
|
avg_ai_prob = float(np.mean(probs)) if probs else 0.0 |
|
|
ai_percent = round(avg_ai_prob * 100, 2) |
|
|
human_percent = round(100 - ai_percent, 2) |
|
|
words_count = len(words) |
|
|
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
|
|
avg_sentence_len = round(words_count / (len(sentences) or 1), 2) |
|
|
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%" |
|
|
except Exception as e: |
|
|
print("[AI-report] model inference failed:", e) |
|
|
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
|
|
ai_percent = ai_percent |
|
|
human_percent = human_percent |
|
|
words_count = wc |
|
|
avg_sentence_len = avg_len |
|
|
summary = f"HEURISTIC fallback β AI probability: {ai_percent}%" |
|
|
else: |
|
|
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
|
|
ai_percent = ai_percent |
|
|
human_percent = human_percent |
|
|
words_count = wc |
|
|
avg_sentence_len = avg_len |
|
|
summary = f"HEURISTIC fallback β AI probability: {ai_percent}%" |
|
|
|
|
|
save_history(user["id"], "ai", text, summary) |
|
|
|
|
|
tiles = [ |
|
|
{'value': f"{ai_percent}%", 'label': 'AI Likelihood'}, |
|
|
{'value': '-', 'label': 'Plagiarism'}, |
|
|
{'value': '-', 'label': 'Human-Like'}, |
|
|
{'value': f"{human_percent}%", 'label': 'Human Likelihood'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(words_count), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Avg Sentence Len': str(avg_sentence_len) |
|
|
} |
|
|
sections = [ |
|
|
{'heading': 'Executive Summary', 'paragraphs': [ |
|
|
summary, |
|
|
{'text': "This AI Content Report analyses the likelihood that portions of the submitted text were generated by AI.", 'highlight': None} |
|
|
]}, |
|
|
{'heading': 'Document Body', 'paragraphs': [text]} |
|
|
] |
|
|
matched_sources = [] |
|
|
footer = "AI detection is probabilistic. Use results as guidance." |
|
|
|
|
|
pdf_path = generate_report("ai", out_dir="/tmp", |
|
|
title_text="AI Content Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_AiReport.pdf") |
|
|
|
|
|
|
|
|
@app.post("/report/ai-file") |
|
|
def report_ai_file(file: UploadFile = File(...), user=Depends(get_current_user)): |
|
|
text = extract_text_from_upload(file).strip() |
|
|
if not text: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file contains no text") |
|
|
|
|
|
|
|
|
if model is not None and tokenizer is not None: |
|
|
try: |
|
|
max_len = getattr(tokenizer, "model_max_length", 512) |
|
|
if max_len is None or max_len > 1024: |
|
|
max_len = 512 |
|
|
|
|
|
words = text.split() |
|
|
chunk_size = min(400, max_len - 10) |
|
|
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] |
|
|
probs = [] |
|
|
for chunk in chunks: |
|
|
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len) |
|
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
p = torch.softmax(logits, dim=1).cpu().numpy()[0] |
|
|
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0]) |
|
|
probs.append(ai_prob) |
|
|
avg_ai_prob = float(np.mean(probs)) if probs else 0.0 |
|
|
ai_percent = round(avg_ai_prob * 100, 2) |
|
|
human_percent = round(100 - ai_percent, 2) |
|
|
words_count = len(words) |
|
|
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()] |
|
|
avg_sentence_len = round(words_count / (len(sentences) or 1), 2) |
|
|
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%" |
|
|
except Exception as e: |
|
|
print("[AI-report-file] model inference failed:", e) |
|
|
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
|
|
ai_percent = ai_percent |
|
|
human_percent = human_percent |
|
|
words_count = wc |
|
|
avg_sentence_len = avg_len |
|
|
summary = f"HEURISTIC fallback β AI probability: {ai_percent}%" |
|
|
else: |
|
|
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text) |
|
|
ai_percent = ai_percent |
|
|
human_percent = human_percent |
|
|
words_count = wc |
|
|
avg_sentence_len = avg_len |
|
|
summary = f"HEURISTIC fallback β AI probability: {ai_percent}%" |
|
|
|
|
|
save_history(user["id"], "ai", text, summary) |
|
|
|
|
|
tiles = [ |
|
|
{'value': f"{ai_percent}%", 'label': 'AI Likelihood'}, |
|
|
{'value': '-', 'label': 'Plagiarism'}, |
|
|
{'value': '-', 'label': 'Human-Like'}, |
|
|
{'value': f"{human_percent}%", 'label': 'Human Likelihood'} |
|
|
] |
|
|
counts = { |
|
|
'Words': str(words_count), |
|
|
'Characters': str(len(text)), |
|
|
'Sentences': str(len([s for s in re.split(r"[.!?]+", text) if s.strip()])), |
|
|
'Paragraphs': str(len([p for p in text.split("\n") if p.strip()])), |
|
|
'Avg Sentence Len': str(avg_sentence_len) |
|
|
} |
|
|
sections = [ |
|
|
{'heading': 'Executive Summary', 'paragraphs': [ |
|
|
summary, |
|
|
{'text': "This AI Content Report analyses the likelihood that portions of the submitted text were generated by AI.", 'highlight': None} |
|
|
]}, |
|
|
{'heading': 'Document Body', 'paragraphs': [text]} |
|
|
] |
|
|
matched_sources = [] |
|
|
footer = "AI detection is probabilistic. Use results as guidance." |
|
|
|
|
|
pdf_path = generate_report("ai", out_dir="/tmp", |
|
|
title_text="AI Content Report", |
|
|
tiles=tiles, counts=counts, sections=sections, |
|
|
matched_sources=matched_sources, footer_text=footer) |
|
|
|
|
|
return FileResponse(pdf_path, media_type="application/pdf", filename="TrueWrite_AiReport.pdf") |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
def read_root(): |
|
|
return {"status": "Backend is running with GECToR + 16GB RAM!"} |
|
|
|