GopalKrushnaMahapatra's picture
Update app.py
da2ddf0 verified
raw
history blame
32.2 kB
# backend/main.py
import os
import re
import io
import sqlite3
from datetime import datetime, timezone
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, status, Header, Depends, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr
from passlib.context import CryptContext
import jwt
# File parsing libs
from docx import Document as DocxDocument
import PyPDF2
# ML / NLP libs
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
# TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Semantic embeddings for plagiarism (combined approach)
try:
from sentence_transformers import SentenceTransformer
except Exception:
SentenceTransformer = None
# LanguageTool (may require Java)
try:
import language_tool_python
except Exception:
language_tool_python = None
# GECToR (neural grammatical error correction)
try:
from gector import GECToR, predict as gector_predict, load_verb_dict
except Exception:
GECToR = None
gector_predict = None
load_verb_dict = None
# ------------------ ENV & DB SETUP ------------------
load_dotenv()
JWT_SECRET = os.getenv("JWT_SECRET", "super_secret_key_change_this")
JWT_ALGO = os.getenv("JWT_ALGO", "HS256")
DB_PATH = os.getenv("DB_PATH", "truewrite.db")
CORPUS_DIR = os.getenv("CORPUS_DIR", "corpus")
CORPUS_RAW = os.getenv("CORPUS_RAW", "corpus_raw")
# Combined plagiarism weights
PLAG_ALPHA = float(os.getenv("PLAG_ALPHA", "0.4")) # TF-IDF weight; (1-alpha) for embeddings
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
# SQLite DB (simple demo)
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Create tables if not exist
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
tool TEXT NOT NULL,
input_text TEXT,
result_summary TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
conn.commit()
# ------------------ FASTAPI APP ------------------
app = FastAPI(title="TrueWrite Scan (Python Backend)")
app.add_middleware(
CORSMiddleware,
# This regex allows ANY URL (HTTP or HTTPS) to connect
allow_origin_regex=r"https?://.*",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ------------------ MODELS ------------------
class SignupRequest(BaseModel):
name: str
email: EmailStr
password: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
class TextRequest(BaseModel):
text: str
# ------------------ AUTH HELPERS ------------------
def hash_password(pw: str) -> str:
return pwd_context.hash(pw)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(user_id: int, email: str) -> str:
payload = {"user_id": user_id, "email": email}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGO)
if isinstance(token, bytes):
token = token.decode("utf-8")
return token
def decode_token(token: str):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGO])
return payload
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
def get_current_user(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing token"
)
token = authorization.split(" ", 1)[1]
payload = decode_token(token)
user_id = payload.get("user_id")
cur.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cur.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return {"id": row["id"], "name": row["name"], "email": row["email"]}
def now_iso():
return datetime.now(timezone.utc).isoformat()
def save_history(user_id: int, tool: str, input_text: str, summary: str):
trimmed = (input_text[:500] + "...") if len(input_text) > 500 else input_text
cur.execute(
"INSERT INTO history (user_id, tool, input_text, result_summary, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, tool, trimmed, summary, now_iso()),
)
conn.commit()
# ------------------ TEXT HELPERS ------------------
def count_words(text: str) -> int:
tokens = text.strip().split()
return len(tokens) if text.strip() else 0
def simple_grammar_correct(text: str):
"""Old heuristic grammar fixer (kept as fallback)."""
corrections = 0
original_words = count_words(text)
before = text
text = re.sub(r"\s{2,}", " ", text)
if text != before:
corrections += 1
before = text
text = re.sub(r"\bi\b", "I", text)
if text != before:
corrections += 1
def cap_match(m):
return m.group(0).upper()
before = text
text = re.sub(r"(^\s*\w|[.!?]\s+\w)", cap_match, text)
if text != before:
corrections += 1
if text.strip() and not re.search(r"[.!?]\s*$", text.strip()):
text = text.strip() + "."
corrections += 1
return text, corrections, original_words
# ------------------ CORPUS BUILDING (from corpus_raw -> corpus) ------------------
def extract_from_docx_path(path: str) -> str:
doc = DocxDocument(path)
paragraphs = [p.text for p in doc.paragraphs]
return "\n".join(paragraphs)
def extract_from_pdf_path(path: str) -> str:
with open(path, "rb") as f:
reader = PyPDF2.PdfReader(f)
texts = []
for pg in range(len(reader.pages)):
try:
texts.append(reader.pages[pg].extract_text() or "")
except Exception:
texts.append("")
return "\n".join(texts)
def build_corpus_from_raw(raw_dir: str = CORPUS_RAW, out_dir: str = CORPUS_DIR):
"""
Convert any .pdf / .docx / .txt files from corpus_raw/ into .txt files in corpus/.
This mirrors your build_corpus.py logic but is called automatically at startup.
"""
os.makedirs(raw_dir, exist_ok=True)
os.makedirs(out_dir, exist_ok=True)
for fname in os.listdir(raw_dir):
inpath = os.path.join(raw_dir, fname)
if not os.path.isfile(inpath):
continue
outname = os.path.splitext(fname)[0] + ".txt"
outpath = os.path.join(out_dir, outname)
try:
ext = fname.lower()
if ext.endswith(".docx"):
text = extract_from_docx_path(inpath)
elif ext.endswith(".pdf"):
text = extract_from_pdf_path(inpath)
elif ext.endswith(".txt"):
with open(inpath, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
else:
print("[CorpusRaw] Skipping unsupported:", fname)
continue
text = text.strip()
with open(outpath, "w", encoding="utf-8") as fo:
fo.write(text)
print("[CorpusRaw] Wrote:", outpath)
except Exception as e:
print("[CorpusRaw] Failed", fname, "->", e)
# ------------------ TF-IDF CORPUS LOADING ------------------
vectorizer = None
corpus_tfidf = None
corpus_titles = []
corpus_texts = []
def load_corpus(corpus_dir=CORPUS_DIR):
"""
Load .txt corpus files from CORPUS_DIR, build TF-IDF index.
Semantic embeddings are built separately in load_embeddings().
"""
global vectorizer, corpus_tfidf, corpus_titles, corpus_texts
corpus_titles = []
corpus_texts = []
if not os.path.isdir(corpus_dir):
os.makedirs(corpus_dir, exist_ok=True)
print("[Corpus] Created empty corpus directory:", corpus_dir)
vectorizer = None
corpus_tfidf = None
return
for fname in os.listdir(corpus_dir):
if fname.lower().endswith(".txt"):
path = os.path.join(corpus_dir, fname)
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
txt = f.read()
corpus_titles.append(fname)
corpus_texts.append(txt)
except Exception as e:
print(f"[Corpus] Failed to read {path}: {e}")
if corpus_texts:
try:
vectorizer = TfidfVectorizer(
ngram_range=(1, 3),
stop_words="english",
max_features=50000
)
corpus_tfidf = vectorizer.fit_transform(corpus_texts)
print(f"[Corpus] Loaded {len(corpus_texts)} documents into TF-IDF index")
except Exception as e:
print("[Corpus] TF-IDF build failed:", e)
vectorizer = None
corpus_tfidf = None
else:
vectorizer = None
corpus_tfidf = None
print("[Corpus] No .txt documents found in", corpus_dir)
# ------------------ SEMANTIC EMBEDDINGS (SentenceTransformers) ------------------
emb_model = None
corpus_emb = None
EMB_MODEL_NAME = os.getenv("PLAG_EMB_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
def load_embeddings():
"""
Build semantic embedding index for plagiarism using sentence-transformers.
"""
global emb_model, corpus_emb
if SentenceTransformer is None:
print("[Embeddings] sentence-transformers not installed; skipping semantic index.")
emb_model = None
corpus_emb = None
return
if not corpus_texts:
print("[Embeddings] No corpus texts available; semantic index not built.")
emb_model = None
corpus_emb = None
return
try:
emb_model = SentenceTransformer(EMB_MODEL_NAME)
corpus_emb = emb_model.encode(
corpus_texts,
convert_to_numpy=True,
show_progress_bar=False,
normalize_embeddings=True,
)
print(f"[Embeddings] Loaded '{EMB_MODEL_NAME}' and encoded {len(corpus_texts)} corpus docs.")
except Exception as e:
emb_model = None
corpus_emb = None
print("[Embeddings] Failed to load or encode corpus:", e)
# Build corpus & embeddings at startup
build_corpus_from_raw()
load_corpus()
load_embeddings()
# ------------------ HF MODEL LOADING (AI Detector) ------------------
AI_DETECTOR_MODEL = "openai-community/roberta-base-openai-detector"
tokenizer = None
model = None
device = None
try:
tokenizer = AutoTokenizer.from_pretrained(AI_DETECTOR_MODEL)
model = AutoModelForSequenceClassification.from_pretrained(AI_DETECTOR_MODEL)
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"[AI Detector] Loaded {AI_DETECTOR_MODEL} on {device}")
except Exception as e:
tokenizer = None
model = None
device = None
print("[AI Detector] Failed to load HF model — using heuristic fallback. Error:", e)
# ------------------ GECToR LOADING (Neural GEC) ------------------
GEC_MODEL = None
try:
# Import specific classes from the installed library
from gector.gec_model import GecBERTModel
from gector.utils.helpers import load_verb_dict
print("[GECToR] Initializing model... (This may take 30s)")
GEC_MODEL = GecBERTModel(
vocab_path="/app/data", # Directory containing verb-form-vocab.txt
model_paths=["/app/data/gector_model.th"],
model_name='roberta-base',
max_len=50,
min_len=3,
iterations=5,
min_error_probability=0.0,
lowercase_tokens=0,
special_tokens_fix=1,
log=False,
is_ensemble=0,
weigths=None,
confidence=0,
del_confidence=0
)
# 2. Load and Attach the Verb Dictionary
# This maps verb forms (e.g., "go" -> "gone")
encode, decode = load_verb_dict("/app/data/verb-form-vocab.txt")
GEC_MODEL.encode = encode
GEC_MODEL.decode = decode
print(f"[GECToR] Model & Verb Dict Loaded Successfully!")
except Exception as e:
GEC_MODEL = None
print(f"[GECToR] Failed to load. Error: {e}")
print("[GECToR] Ensure you updated Dockerfile to download 'gector_model.th'")
def gector_correct(text: str):
"""
Run neural grammatical error correction using GECToR.
"""
# 1. Check if model is loaded
if GEC_MODEL is None:
print("[GECToR] Model not loaded, skipping.")
return text, 0, len(text.split())
# 2. Safety Truncate (Server protection)
parts = text.strip().split()
if len(parts) > 1000:
text_proc = " ".join(parts[:1000])
else:
text_proc = text.strip()
if not text_proc:
return text_proc, 0, 0
# 3. Split into sentences and then tokens
# GECToR expects a list of token lists: [['Hello', 'world'], ['How', 'are', 'you']]
sentences = re.split(r"(?<=[.!?])\s+", text_proc)
batch = [s.strip().split() for s in sentences if s.strip()]
if not batch:
return text_proc, 0, 0
try:
# 4. Run Prediction
# We pass the encode/decode maps we loaded earlier
final_batch, total_updates = GEC_MODEL.handle_batch(
batch,
encode_mapping=GEC_MODEL.encode,
decode_mapping=GEC_MODEL.decode
)
# 5. Reconstruct Text
corrected_sentences = [" ".join(tokens) for tokens in final_batch]
corrected_text = " ".join(corrected_sentences)
# 6. Count Corrections
# Simple word-by-word comparison
original_words = len(text_proc.split())
corrections = sum(1 for a, b in zip(text_proc.split(), corrected_text.split()) if a != b)
return corrected_text, corrections, original_words
except Exception as e:
print(f"[GECToR] Prediction error: {e}")
# Fallback to original text if crash
return text_proc, 0, len(text_proc.split())
# ------------------ FILE EXTRACTION HELPERS ------------------
MAX_FILE_SIZE = 15 * 1024 * 1024 # 15 MB
def extract_text_from_upload(upload: UploadFile) -> str:
filename = (upload.filename or "").lower()
content_type = (upload.content_type or "").lower()
data = upload.file.read()
try:
upload.file.seek(0)
except Exception:
pass
if len(data) > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large (max 15MB)")
# TXT
if filename.endswith(".txt") or content_type == "text/plain":
try:
try:
return data.decode("utf-8")
except UnicodeDecodeError:
return data.decode("latin-1")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to decode text file: {e}")
# DOCX
if filename.endswith(".docx") or "wordprocessingml" in content_type:
# Basic sanity check: valid .docx is a ZIP (PK header)
if not data.startswith(b"PK"):
raise HTTPException(
status_code=400,
detail="Uploaded file is not a valid .docx package (it might be an old .doc file or a corrupted document). "
"Please open it in Word/Google Docs and re-save as .docx or export as PDF, then upload again."
)
try:
f = io.BytesIO(data)
doc = DocxDocument(f)
paragraphs = [p.text for p in doc.paragraphs]
text = "\n".join(paragraphs).strip()
if not text:
raise ValueError("DOCX contained no readable text.")
return text
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse docx file: {e}. Try opening it in Word/Google Docs and exporting again as .docx or PDF."
)
# PDF
if filename.endswith(".pdf") or "pdf" in content_type:
try:
f = io.BytesIO(data)
reader = PyPDF2.PdfReader(f)
texts = []
for pg in range(len(reader.pages)):
try:
txt = reader.pages[pg].extract_text() or ""
except Exception:
txt = ""
texts.append(txt)
return "\n".join(texts)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse PDF file: {e}")
raise HTTPException(
status_code=415,
detail="Unsupported file type. Use .txt, .pdf, or .docx",
)
# ------------------ GRAMMAR (LANGUAGETOOL INTEGRATION) ------------------
lt_tool = None
if language_tool_python is not None:
try:
lt_tool = language_tool_python.LanguageTool("en-US")
print("[LanguageTool] Loaded (local Java-backed checker)")
except Exception as e:
lt_tool = None
print("[LanguageTool] Could not start local LanguageTool — falling back. Error:", e)
else:
print("[LanguageTool] library not installed; falling back to heuristics.")
def grammar_with_languagetool(text: str):
parts = text.strip().split()
if len(parts) > 1000:
text_proc = " ".join(parts[:1000])
else:
text_proc = text.strip()
matches = lt_tool.check(text_proc)
corrected = language_tool_python.utils.correct(text_proc, matches)
corrections = len(matches)
return corrected, corrections, len(text_proc.split())
# ------------------ PLAGIARISM HELPERS (COMBINED ENGINE) ------------------
def _clean_for_jaccard(t: str):
t = t.lower()
t = re.sub(r"[^a-z0-9\s]", " ", t)
return [w for w in t.split() if w]
def _jaccard_similarity(a, b):
sa = set(a)
sb = set(b)
if not sa or not sb:
return 0.0
return len(sa & sb) / len(sa | sb)
def demo_plagiarism_fallback(text: str):
"""
Simple Jaccard-based fallback using a tiny built-in sample set.
Used when no TF-IDF / semantic corpus is available.
"""
SAMPLE_DOCS = [
{"title": "AI for Social Good",
"text": "Artificial intelligence is transforming multiple industries by automating routine tasks and enabling data driven decision making for social impact and efficiency."},
{"title": "IoT in Smart Cities",
"text": "The Internet of Things connects sensors, devices, and cloud platforms to enable real time monitoring and control in smart cities including lighting, traffic, and waste management."},
{"title": "Climate & Renewable Energy",
"text": "Climate change is a critical global challenge that demands renewable energy, efficient resource management, and international cooperation to ensure a sustainable future."},
]
input_words = _clean_for_jaccard(text)
best_score = 0.0
matches = []
for doc in SAMPLE_DOCS:
doc_words = _clean_for_jaccard(doc["text"])
score = _jaccard_similarity(input_words, doc_words)
matches.append({"title": doc["title"], "score": round(score * 100, 2)})
if score > best_score:
best_score = score
matches.sort(key=lambda x: x["score"], reverse=True)
plagiarism_percent = round(best_score * 100, 2)
summary = f"Plagiarism estimate (demo Jaccard): {plagiarism_percent}%"
return {"plagiarism_percent": plagiarism_percent, "matches": matches[:5], "summary": summary}
def corpus_plagiarism_combined(text: str):
"""
Combined plagiarism score using:
- TF-IDF cosine similarity
- Semantic embedding cosine similarity (SentenceTransformers)
Returns dict matching API schema:
{ plagiarism_percent, matches, summary }
"""
if not corpus_texts:
raise ValueError("No corpus texts loaded")
sims_tfidf = None
sims_emb = None
words = text.split()
if len(words) > 3000:
text_proc = " ".join(words[:3000])
else:
text_proc = text
# TF-IDF similarity
if vectorizer is not None and corpus_tfidf is not None:
q = vectorizer.transform([text_proc])
sims_tfidf = cosine_similarity(q, corpus_tfidf)[0]
# Semantic similarity
if emb_model is not None and corpus_emb is not None:
q_emb = emb_model.encode(
[text_proc],
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
)[0]
sims_emb = corpus_emb @ q_emb # normalized → dot = cosine
if sims_tfidf is None and sims_emb is None:
raise ValueError("No plagiarism backends (TF-IDF / embeddings) are available")
n_docs = len(corpus_texts)
combined_rows = []
alpha = PLAG_ALPHA # TF-IDF weight
for i in range(n_docs):
tf = float(sims_tfidf[i]) if sims_tfidf is not None else None
se = float(sims_emb[i]) if sims_emb is not None else None
if tf is None and se is None:
continue
if tf is not None and se is not None:
score = alpha * tf + (1.0 - alpha) * se
elif tf is not None:
score = tf
else:
score = se
combined_rows.append({
"index": i,
"combined": score,
"tfidf": tf,
"semantic": se,
})
if not combined_rows:
raise ValueError("No scores computed for corpus documents")
combined_rows.sort(key=lambda x: x["combined"], reverse=True)
top = combined_rows[:10]
best = top[0]["combined"]
plagiarism_percent = round(best * 100, 2)
matches = []
for row in top:
matches.append({
"title": corpus_titles[row["index"]],
"score": round(row["combined"] * 100, 2),
"tfidf_score": round(row["tfidf"] * 100, 2) if row["tfidf"] is not None else None,
"semantic_score": round(row["semantic"] * 100, 2) if row["semantic"] is not None else None,
})
components = []
if sims_tfidf is not None:
components.append("TF-IDF")
if sims_emb is not None:
components.append("semantic embeddings")
comp_str = " + ".join(components)
summary = f"Plagiarism estimate (combined {comp_str}): {plagiarism_percent}%"
return {"plagiarism_percent": plagiarism_percent, "matches": matches, "summary": summary}
# ------------------ ENDPOINTS ------------------
@app.post("/api/signup")
def signup(req: SignupRequest):
cur.execute("SELECT id FROM users WHERE email = ?", (req.email,))
if cur.fetchone():
raise HTTPException(status_code=400, detail="Email already registered")
pw_hash = hash_password(req.password)
created_at = now_iso()
cur.execute(
"INSERT INTO users (name, email, password_hash, created_at) VALUES (?, ?, ?, ?)",
(req.name, req.email, pw_hash, created_at),
)
conn.commit()
user_id = cur.lastrowid
token = create_token(user_id, req.email)
return {
"message": "Signup successful",
"token": token,
"name": req.name,
"email": req.email,
}
@app.post("/api/login")
def login(req: LoginRequest):
cur.execute("SELECT * FROM users WHERE email = ?", (req.email,))
row = cur.fetchone()
if not row or not verify_password(req.password, row["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid email or password")
token = create_token(row["id"], row["email"])
return {
"message": "Login successful",
"token": token,
"name": row["name"],
"email": row["email"],
}
@app.post("/api/grammar-check")
def api_grammar_check(req: TextRequest, user=Depends(get_current_user)):
text = req.text or ""
if not text.strip():
raise HTTPException(status_code=400, detail="Text is required")
# Prefer GECToR → LanguageTool → heuristics
if GEC_MODEL is not None:
corrected, corrections, original_words = gector_correct(text)
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}"
elif lt_tool is not None:
corrected, corrections, original_words = grammar_with_languagetool(text)
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}"
else:
corrected, corrections, original_words = simple_grammar_correct(text)
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}"
save_history(user["id"], "grammar", text, summary)
return {
"original_words": original_words,
"corrections": corrections,
"corrected_text": corrected,
"summary": summary,
}
@app.post("/api/grammar-check-file")
def api_grammar_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
if GEC_MODEL is not None:
corrected, corrections, original_words = gector_correct(text)
summary = f"GECToR neural GEC: {corrections} edits; words analysed: {original_words}"
elif lt_tool is not None:
corrected, corrections, original_words = grammar_with_languagetool(text)
summary = f"LanguageTool corrections: {corrections}; words analysed: {original_words}"
else:
parts = text.strip().split()
if len(parts) > 1000:
text = " ".join(parts[:1000])
corrected, corrections, original_words = simple_grammar_correct(text)
summary = f"HEURISTIC corrections: {corrections}; words analysed: {original_words}"
save_history(user["id"], "grammar", text, summary)
return {
"original_words": original_words,
"corrections": corrections,
"corrected_text": corrected,
"summary": summary,
}
# ------------------ PLAGIARISM ENDPOINTS (COMBINED) ------------------
@app.post("/api/plagiarism-check")
def api_plagiarism_check(req: TextRequest, user=Depends(get_current_user)):
text = req.text or ""
if not text.strip():
raise HTTPException(status_code=400, detail="Text is required")
# First try full combined engine (TF-IDF + embeddings) with corpus
try:
result = corpus_plagiarism_combined(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
except Exception as e:
print("[Plagiarism] Combined corpus engine failed, falling back to demo:", e)
# Fallback: small Jaccard demo
result = demo_plagiarism_fallback(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
@app.post("/api/plagiarism-check-file")
def api_plagiarism_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
try:
result = corpus_plagiarism_combined(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
except Exception as e:
print("[Plagiarism-file] Combined corpus engine failed, falling back to demo:", e)
# Fallback to demo if corpus/engines unavailable
result = demo_plagiarism_fallback(text)
save_history(user["id"], "plagiarism", text, result["summary"])
return result
# ------------------ AI CHECK (TEXT & FILE) ------------------
def heuristic_ai_score(text: str):
words = re.sub(r"[^a-z0-9\s]", " ", text.lower()).split()
word_count = len(words)
unique_ratio = len(set(words)) / (word_count or 1)
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
avg_sentence_length = word_count / (len(sentences) or 1)
ai_score = 0
if unique_ratio < 0.45:
ai_score += 40
elif unique_ratio < 0.6:
ai_score += 20
if avg_sentence_length > 25:
ai_score += 40
elif avg_sentence_length > 18:
ai_score += 25
if word_count > 400:
ai_score += 10
ai_score = min(100, round(ai_score))
human_score = 100 - ai_score
return ai_score, human_score, word_count, avg_sentence_length, unique_ratio
@app.post("/api/ai-check")
def api_ai_check(req: TextRequest, user=Depends(get_current_user)):
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="Text is required")
if model is not None and tokenizer is not None:
try:
max_len = getattr(tokenizer, "model_max_length", 512)
if max_len is None or max_len > 1024:
max_len = 512
words = text.split()
chunk_size = min(400, max_len - 10)
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
probs = []
for chunk in chunks:
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_len)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
p = torch.softmax(logits, dim=1).cpu().numpy()[0]
ai_prob = float(p[1]) if p.shape[0] > 1 else float(p[0])
probs.append(ai_prob)
avg_ai_prob = float(np.mean(probs)) if probs else 0.0
ai_percent = round(avg_ai_prob * 100, 2)
human_percent = round(100 - ai_percent, 2)
words_count = len(words)
sentences = [s.strip() for s in re.split(r"[.!?]+", text) if s.strip()]
avg_sentence_len = round(words_count / (len(sentences) or 1), 2)
summary = f"Model: {AI_DETECTOR_MODEL}; AI probability: {ai_percent}%"
save_history(user["id"], "ai", text, summary)
return {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": words_count,
"avg_sentence_length": avg_sentence_len,
"summary": summary,
}
except Exception as e:
print("[AI-check] model inference failed:", e)
ai_percent, human_percent, wc, avg_len, uniq = heuristic_ai_score(text)
summary = f"HEURISTIC fallback — AI probability: {ai_percent}%"
save_history(user["id"], "ai", text, summary)
return {
"ai_percent": ai_percent,
"human_percent": human_percent,
"word_count": wc,
"avg_sentence_length": avg_len,
"unique_ratio": round(uniq, 3),
"summary": summary,
}
@app.post("/api/ai-check-file")
def api_ai_check_file(file: UploadFile = File(...), user=Depends(get_current_user)):
text = extract_text_from_upload(file).strip()
if not text:
raise HTTPException(status_code=400, detail="Uploaded file contains no text")
return api_ai_check.__wrapped__(TextRequest(text=text), user)
# ------------------ HISTORY ------------------
@app.get("/api/history")
def api_history(user=Depends(get_current_user)):
cur.execute(
"SELECT id, tool, input_text, result_summary, created_at "
"FROM history WHERE user_id = ? "
"ORDER BY created_at DESC LIMIT 50",
(user["id"],),
)
rows = cur.fetchall()
items = []
for r in rows:
items.append(
{
"id": r["id"],
"tool": r["tool"],
"input_text": r["input_text"],
"summary": r["result_summary"],
"created_at": r["created_at"],
}
)
return {"items": items}
@app.get("/")
def read_root():
return {"status": "Backend is running with 16GB RAM!"}