Spaces:
Running
Running
File size: 4,037 Bytes
31086ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from pathlib import Path
from typing import Any, List, Optional, Union
import ray
from graphgen.models import (
CSVReader,
JSONReader,
ParquetReader,
PDFReader,
PickleReader,
RDFReader,
TXTReader,
)
from graphgen.utils import compute_mm_hash, logger
from .parallel_file_scanner import ParallelFileScanner
_MAPPING = {
"jsonl": JSONReader,
"json": JSONReader,
"txt": TXTReader,
"csv": CSVReader,
"md": TXTReader,
"pdf": PDFReader,
"parquet": ParquetReader,
"pickle": PickleReader,
"rdf": RDFReader,
"owl": RDFReader,
"ttl": RDFReader,
}
def _build_reader(suffix: str, cache_dir: str | None, **reader_kwargs):
"""Factory function to build appropriate reader instance"""
suffix = suffix.lower()
reader_cls = _MAPPING.get(suffix)
if not reader_cls:
raise ValueError(f"Unsupported file suffix: {suffix}")
# Special handling for PDFReader which needs output_dir
if suffix == "pdf":
if cache_dir is None:
raise ValueError("cache_dir must be provided for PDFReader")
return reader_cls(output_dir=cache_dir, **reader_kwargs)
return reader_cls(**reader_kwargs)
def read(
input_path: Union[str, List[str]],
allowed_suffix: Optional[List[str]] = None,
cache_dir: Optional[str] = "cache",
parallelism: int = 4,
recursive: bool = True,
**reader_kwargs: Any,
) -> ray.data.Dataset:
"""
Unified entry point to read files of multiple types using Ray Data.
:param input_path: File or directory path(s) to read from
:param allowed_suffix: List of allowed file suffixes (e.g., ['pdf', 'txt'])
:param cache_dir: Directory to cache intermediate files (PDF processing)
:param parallelism: Number of parallel workers
:param recursive: Whether to scan directories recursively
:param reader_kwargs: Additional kwargs passed to readers
:return: Ray Dataset containing all documents
"""
try:
# 1. Scan all paths to discover files
logger.info("[READ] Scanning paths: %s", input_path)
scanner = ParallelFileScanner(
cache_dir=cache_dir,
allowed_suffix=allowed_suffix,
rescan=False,
max_workers=parallelism if parallelism > 0 else 1,
)
all_files = []
scan_results = scanner.scan(input_path, recursive=recursive)
for result in scan_results.values():
all_files.extend(result.get("files", []))
logger.info("[READ] Found %d files to process", len(all_files))
if not all_files:
raise ValueError("No files found to read.")
# 2. Group files by suffix to use appropriate reader
files_by_suffix = {}
for file_info in all_files:
suffix = Path(file_info["path"]).suffix.lower().lstrip(".")
if allowed_suffix and suffix not in [
s.lower().lstrip(".") for s in allowed_suffix
]:
continue
files_by_suffix.setdefault(suffix, []).append(file_info["path"])
# 3. Create read tasks
read_tasks = []
for suffix, file_paths in files_by_suffix.items():
reader = _build_reader(suffix, cache_dir, **reader_kwargs)
ds = reader.read(file_paths)
read_tasks.append(ds)
# 4. Combine all datasets
if not read_tasks:
raise ValueError("No datasets created from the provided files.")
if len(read_tasks) == 1:
combined_ds = read_tasks[0]
else:
combined_ds = read_tasks[0].union(*read_tasks[1:])
combined_ds = combined_ds.map(
lambda record: {
**record,
"_doc_id": compute_mm_hash(record, prefix="doc-"),
}
)
logger.info("[READ] Successfully read files from %s", input_path)
return combined_ds
except Exception as e:
logger.error("[READ] Failed to read files from %s: %s", input_path, e)
raise
|