Spaces:
Running
Running
File size: 9,089 Bytes
5f219fc e25b548 5f219fc 65b9482 fb9c306 65b9482 e25b548 65b9482 fb9c306 5f219fc b7519b4 e25b548 fb9c306 e25b548 fb9c306 e25b548 b7519b4 fb9c306 5f219fc 65b9482 e25b548 65b9482 fb9c306 65b9482 fb9c306 65b9482 e25b548 65b9482 fb9c306 65b9482 e25b548 65b9482 e25b548 65b9482 fb9c306 65b9482 e25b548 65b9482 fb9c306 65b9482 5f219fc 65b9482 5f219fc 65b9482 5f219fc 65b9482 5f219fc 65b9482 e25b548 5f219fc e25b548 5f219fc e25b548 b7519b4 e25b548 b7519b4 e25b548 b7519b4 e25b548 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
import asyncio
import os
import re
import subprocess
import tempfile
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from io import StringIO
from typing import Dict, Optional
from Bio import ExPASy, SeqIO, SwissProt, UniProt
from Bio.Blast import NCBIWWW, NCBIXML
from requests.exceptions import RequestException
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from graphgen.bases import BaseSearcher
from graphgen.utils import logger
@lru_cache(maxsize=None)
def _get_pool():
return ThreadPoolExecutor(max_workers=10)
# ensure only one BLAST searcher at a time
_blast_lock = asyncio.Lock()
class UniProtSearch(BaseSearcher):
"""
UniProt Search client to searcher with UniProt.
1) Get the protein by accession number.
2) Search with keywords or protein names (fuzzy searcher).
3) Search with FASTA sequence (BLAST searcher). Note that NCBIWWW does not support async.
"""
def __init__(self, use_local_blast: bool = False, local_blast_db: str = "sp_db"):
super().__init__()
self.use_local_blast = use_local_blast
self.local_blast_db = local_blast_db
if self.use_local_blast and not os.path.isfile(f"{self.local_blast_db}.phr"):
logger.error("Local BLAST database files not found. Please check the path.")
self.use_local_blast = False
def get_by_accession(self, accession: str) -> Optional[dict]:
try:
handle = ExPASy.get_sprot_raw(accession)
record = SwissProt.read(handle)
handle.close()
return self._swissprot_to_dict(record)
except RequestException: # network-related errors
raise
except Exception as exc: # pylint: disable=broad-except
logger.error("Accession %s not found: %s", accession, exc)
return None
@staticmethod
def _swissprot_to_dict(record: SwissProt.Record) -> dict:
"""error
Convert a SwissProt.Record to a dictionary.
"""
functions = []
for line in record.comments:
if line.startswith("FUNCTION:"):
functions.append(line[9:].strip())
return {
"molecule_type": "protein",
"database": "UniProt",
"id": record.accessions[0],
"entry_name": record.entry_name,
"gene_names": record.gene_name,
"protein_name": record.description.split(";")[0].split("=")[-1],
"organism": record.organism.split(" (")[0],
"sequence": str(record.sequence),
"function": functions,
"url": f"https://www.uniprot.org/uniprot/{record.accessions[0]}",
}
def get_best_hit(self, keyword: str) -> Optional[Dict]:
"""
Search UniProt with a keyword and return the best hit.
:param keyword: The searcher keyword.
:return: A dictionary containing the best hit information or None if not found.
"""
if not keyword.strip():
return None
try:
iterator = UniProt.search(keyword, fields=None, batch_size=1)
hit = next(iterator, None)
if hit is None:
return None
return self.get_by_accession(hit["primaryAccession"])
except RequestException:
raise
except Exception as e: # pylint: disable=broad-except
logger.error("Keyword %s not found: %s", keyword, e)
return None
def get_by_fasta(self, fasta_sequence: str, threshold: float) -> Optional[Dict]:
"""
Search UniProt with a FASTA sequence and return the best hit.
:param fasta_sequence: The FASTA sequence.
:param threshold: E-value threshold for BLAST searcher.
:return: A dictionary containing the best hit information or None if not found.
"""
try:
if fasta_sequence.startswith(">"):
seq = str(list(SeqIO.parse(StringIO(fasta_sequence), "fasta"))[0].seq)
else:
seq = fasta_sequence.strip()
except Exception as e: # pylint: disable=broad-except
logger.error("Invalid FASTA sequence: %s", e)
return None
if not seq:
logger.error("Empty FASTA sequence provided.")
return None
accession = None
if self.use_local_blast:
accession = self._local_blast(seq, threshold)
if accession:
logger.debug("Local BLAST found accession: %s", accession)
if not accession:
logger.debug("Falling back to NCBIWWW.qblast.")
# UniProtKB/Swiss-Prot BLAST API
try:
logger.debug(
"Performing BLAST searcher for the given sequence: %s", seq
)
result_handle = NCBIWWW.qblast(
program="blastp",
database="swissprot",
sequence=seq,
hitlist_size=1,
expect=threshold,
)
blast_record = NCBIXML.read(result_handle)
except RequestException:
raise
except Exception as e: # pylint: disable=broad-except
logger.error("BLAST searcher failed: %s", e)
return None
if not blast_record.alignments:
logger.info("No BLAST hits found for the given sequence.")
return None
best_alignment = blast_record.alignments[0]
best_hsp = best_alignment.hsps[0]
if best_hsp.expect > threshold:
logger.info("No BLAST hits below the threshold E-value.")
return None
hit_id = best_alignment.hit_id
# like sp|P01308.1|INS_HUMAN
accession = hit_id.split("|")[1].split(".")[0] if "|" in hit_id else hit_id
return self.get_by_accession(accession)
def _local_blast(self, seq: str, threshold: float) -> Optional[str]:
"""
Perform local BLAST search using local BLAST database.
:param seq: The protein sequence.
:param threshold: E-value threshold for BLAST searcher.
:return: The accession number of the best hit or None if not found.
"""
try:
with tempfile.NamedTemporaryFile(
mode="w+", suffix=".fa", delete=False
) as tmp:
tmp.write(f">query\n{seq}\n")
tmp_name = tmp.name
cmd = [
"blastp",
"-db",
self.local_blast_db,
"-query",
tmp_name,
"-evalue",
str(threshold),
"-max_target_seqs",
"1",
"-outfmt",
"6 sacc", # only return accession
]
logger.debug("Running local blastp: %s", " ".join(cmd))
out = subprocess.check_output(cmd, text=True).strip()
os.remove(tmp_name)
if out:
return out.split("\n", maxsplit=1)[0]
return None
except Exception as exc: # pylint: disable=broad-except
logger.error("Local blastp failed: %s", exc)
return None
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(RequestException),
reraise=True,
)
async def search(
self, query: str, threshold: float = 0.7, **kwargs
) -> Optional[Dict]:
"""
Search UniProt with either an accession number, keyword, or FASTA sequence.
:param query: The searcher query (accession number, keyword, or FASTA sequence).
:param threshold: E-value threshold for BLAST searcher.
:return: A dictionary containing the best hit information or None if not found.
"""
# auto detect query type
if not query or not isinstance(query, str):
logger.error("Empty or non-string input.")
return None
query = query.strip()
logger.debug("UniProt searcher query: %s", query)
loop = asyncio.get_running_loop()
# check if fasta sequence
if query.startswith(">") or re.fullmatch(
r"[ACDEFGHIKLMNPQRSTVWY\s]+", query, re.I
):
async with _blast_lock:
result = await loop.run_in_executor(
_get_pool(), self.get_by_fasta, query, threshold
)
# check if accession number
elif re.fullmatch(r"[A-NR-Z0-9]{6,10}", query, re.I):
result = await loop.run_in_executor(
_get_pool(), self.get_by_accession, query
)
else:
# otherwise treat as keyword
result = await loop.run_in_executor(_get_pool(), self.get_best_hit, query)
if result:
result["_search_query"] = query
return result
|