Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import faiss | |
| import numpy as np | |
| from typing import List, Tuple | |
| from text2vec import SentenceModel | |
| class JSONLIndexer(object): | |
| """ | |
| JSONL 文件检索器(基于预计算embedding) | |
| """ | |
| def __init__(self, vector_sz: int, n_subquantizers=0, n_bits=8, model: SentenceModel = None, **kwargs): | |
| """ | |
| 初始化索引器,选择使用FAISS的类型 | |
| :param vector_sz: 嵌入向量的大小 | |
| :param n_subquantizers: 子量化器数量 | |
| :param n_bits: 每个子向量的位数 | |
| :param model: SentenceModel 模型,用于对query重新embedding | |
| """ | |
| if n_subquantizers > 0: | |
| self.index = faiss.IndexPQ(vector_sz, n_subquantizers, n_bits, faiss.METRIC_INNER_PRODUCT) | |
| else: | |
| self.index = faiss.IndexFlatIP(vector_sz) | |
| self.index_id_to_data = [] # FAISS索引ID到JSON记录索引的映射 | |
| self.data = [] # 存储所有JSON对象 | |
| self.model = model | |
| print(f'Initialized FAISS index of type {type(self.index)}') | |
| def load_jsonl(self, dataset_path: str, embedding_field: str = "embedding", id_field: str = "id") -> None: | |
| """ | |
| 加载JSONL文件并构建FAISS索引(使用预计算embedding) | |
| :param dataset_path: JSONL文件路径 | |
| :param embedding_field: JSON对象中存放embedding的字段名 | |
| :param id_field: JSON对象中作为待检索文本的字段(这里认为为id) | |
| """ | |
| print(f'📂 Loading JSONL file: {dataset_path}...') | |
| # 逐行读取JSONL文件 | |
| with open(dataset_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| record = json.loads(line) | |
| self.data.append(record) | |
| total = len(self.data) | |
| print(f'✅ Loaded {total} records from {dataset_path}.') | |
| # 直接从每个JSON对象中提取预计算embedding | |
| embeddings_list = [] | |
| for rec in self.data: | |
| emb = rec.get(embedding_field, []) | |
| # 检查embedding长度是否符合预期 | |
| if len(emb) != self.index.d and self.index.ntotal == 0: | |
| # 如果第一次添加且长度不匹配,可以根据需要进行处理,比如报错或跳过 | |
| raise ValueError(f"Embedding length mismatch. Expected {self.index.d}, got {len(emb)}.") | |
| embeddings_list.append(np.array(emb, dtype=np.float32)) | |
| embeddings = np.stack(embeddings_list, axis=0) | |
| print(f'✅ Embeddings loaded, shape: {embeddings.shape}. Indexing data...') | |
| # 用数据在FAISS中建立索引 | |
| ids = list(range(total)) | |
| self.index_data(ids, embeddings) | |
| print('🎉 Indexing complete!') | |
| def index_data(self, ids: List[int], embeddings: np.array, **kwargs): | |
| """ | |
| 将预先计算好的embedding添加到FAISS索引中 | |
| :param ids: 每个记录的索引号(这里用list(range(total))) | |
| :param embeddings: 记录对应的embedding矩阵 | |
| """ | |
| self._update_id_mapping(ids) | |
| embeddings = embeddings.astype('float32') | |
| # 如果索引未训练,则先训练 | |
| if not self.index.is_trained: | |
| print('⚙️ Training FAISS index...') | |
| self.index.train(embeddings) | |
| print('✅ FAISS index trained.') | |
| self.index.add(embeddings) | |
| print(f'✅ Indexed {len(self.index_id_to_data)} records.') | |
| def _update_id_mapping(self, row_ids: List[int]): | |
| """更新FAISS索引ID到JSON记录索引的映射""" | |
| self.index_id_to_data.extend(row_ids) | |
| def search_return_id(self, query: str, top_docs: int) -> Tuple[List[str], List[float]]: | |
| """ | |
| 根据query返回最相似的JSON记录的id和相似度分数 | |
| :param query: 查询文本 | |
| :param top_docs: 返回的最近邻记录数量 | |
| :return: (记录的id列表, 分数列表) | |
| """ | |
| db_indices, scores = self.search(query, top_docs) | |
| # 这里假设待检索文本就是json对象中的id字段 | |
| result_ids = [self.data[i]["id"] for i in db_indices] | |
| return result_ids, scores | |
| def search(self, query: str, top_docs: int) -> Tuple[List[int], List[float]]: | |
| """ | |
| 对query重新embedding后,在FAISS索引中检索 | |
| :param query: 查询文本 | |
| :param top_docs: 返回的最近邻记录数量 | |
| :return: (JSON记录的索引列表, 相似度分数列表) | |
| """ | |
| # 仅对query重新计算embedding | |
| query_vector = self.model.encode(query).astype('float32').reshape(1, -1) | |
| scores, indexes = self.index.search(query_vector, top_docs) | |
| scores = scores[0] | |
| indexes = indexes[0] | |
| db_indices = [self.index_id_to_data[i] for i in indexes] | |
| return db_indices, scores | |
| # 示例用法 | |
| if __name__ == '__main__': | |
| model = SentenceModel("BAAI/bge-base-en-v1.5") | |
| vector_size = 768 # 请根据你的模型确定嵌入向量维度 | |
| indexer = JSONLIndexer(vector_sz=vector_size, model=model) | |
| jsonl_path = "tool-embedding.jsonl" # 替换为实际JSONL文件路径 | |
| indexer.load_jsonl(jsonl_path) | |
| query = "your search query here" | |
| ids, scores = indexer.search_return_id(query, top_docs=5) | |
| print("检索结果:", list(zip(ids, scores))) | |