Spaces:
Running
Running
| from pathlib import Path | |
| from typing import Any, List, Optional, Union | |
| import ray | |
| from graphgen.models import ( | |
| CSVReader, | |
| JSONReader, | |
| ParquetReader, | |
| PDFReader, | |
| PickleReader, | |
| RDFReader, | |
| TXTReader, | |
| ) | |
| from graphgen.utils import compute_mm_hash, logger | |
| from .parallel_file_scanner import ParallelFileScanner | |
| _MAPPING = { | |
| "jsonl": JSONReader, | |
| "json": JSONReader, | |
| "txt": TXTReader, | |
| "csv": CSVReader, | |
| "md": TXTReader, | |
| "pdf": PDFReader, | |
| "parquet": ParquetReader, | |
| "pickle": PickleReader, | |
| "rdf": RDFReader, | |
| "owl": RDFReader, | |
| "ttl": RDFReader, | |
| } | |
| def _build_reader(suffix: str, cache_dir: str | None, **reader_kwargs): | |
| """Factory function to build appropriate reader instance""" | |
| suffix = suffix.lower() | |
| reader_cls = _MAPPING.get(suffix) | |
| if not reader_cls: | |
| raise ValueError(f"Unsupported file suffix: {suffix}") | |
| # Special handling for PDFReader which needs output_dir | |
| if suffix == "pdf": | |
| if cache_dir is None: | |
| raise ValueError("cache_dir must be provided for PDFReader") | |
| return reader_cls(output_dir=cache_dir, **reader_kwargs) | |
| return reader_cls(**reader_kwargs) | |
| def read( | |
| input_path: Union[str, List[str]], | |
| allowed_suffix: Optional[List[str]] = None, | |
| cache_dir: Optional[str] = "cache", | |
| parallelism: int = 4, | |
| recursive: bool = True, | |
| **reader_kwargs: Any, | |
| ) -> ray.data.Dataset: | |
| """ | |
| Unified entry point to read files of multiple types using Ray Data. | |
| :param input_path: File or directory path(s) to read from | |
| :param allowed_suffix: List of allowed file suffixes (e.g., ['pdf', 'txt']) | |
| :param cache_dir: Directory to cache intermediate files (PDF processing) | |
| :param parallelism: Number of parallel workers | |
| :param recursive: Whether to scan directories recursively | |
| :param reader_kwargs: Additional kwargs passed to readers | |
| :return: Ray Dataset containing all documents | |
| """ | |
| try: | |
| # 1. Scan all paths to discover files | |
| logger.info("[READ] Scanning paths: %s", input_path) | |
| scanner = ParallelFileScanner( | |
| cache_dir=cache_dir, | |
| allowed_suffix=allowed_suffix, | |
| rescan=False, | |
| max_workers=parallelism if parallelism > 0 else 1, | |
| ) | |
| all_files = [] | |
| scan_results = scanner.scan(input_path, recursive=recursive) | |
| for result in scan_results.values(): | |
| all_files.extend(result.get("files", [])) | |
| logger.info("[READ] Found %d files to process", len(all_files)) | |
| if not all_files: | |
| raise ValueError("No files found to read.") | |
| # 2. Group files by suffix to use appropriate reader | |
| files_by_suffix = {} | |
| for file_info in all_files: | |
| suffix = Path(file_info["path"]).suffix.lower().lstrip(".") | |
| if allowed_suffix and suffix not in [ | |
| s.lower().lstrip(".") for s in allowed_suffix | |
| ]: | |
| continue | |
| files_by_suffix.setdefault(suffix, []).append(file_info["path"]) | |
| # 3. Create read tasks | |
| read_tasks = [] | |
| for suffix, file_paths in files_by_suffix.items(): | |
| reader = _build_reader(suffix, cache_dir, **reader_kwargs) | |
| ds = reader.read(file_paths) | |
| read_tasks.append(ds) | |
| # 4. Combine all datasets | |
| if not read_tasks: | |
| raise ValueError("No datasets created from the provided files.") | |
| if len(read_tasks) == 1: | |
| combined_ds = read_tasks[0] | |
| else: | |
| combined_ds = read_tasks[0].union(*read_tasks[1:]) | |
| combined_ds = combined_ds.map( | |
| lambda record: { | |
| **record, | |
| "_doc_id": compute_mm_hash(record, prefix="doc-"), | |
| } | |
| ) | |
| logger.info("[READ] Successfully read files from %s", input_path) | |
| return combined_ds | |
| except Exception as e: | |
| logger.error("[READ] Failed to read files from %s: %s", input_path, e) | |
| raise | |