GraphGen / graphgen /operators /generate /generate_service.py
github-actions[bot]
Auto-sync from demo at Tue Dec 16 08:21:05 UTC 2025
31086ae
import pandas as pd
from graphgen.bases import BaseLLMWrapper, BaseOperator
from graphgen.common import init_llm
from graphgen.models import (
AggregatedGenerator,
AtomicGenerator,
CoTGenerator,
MultiHopGenerator,
VQAGenerator,
)
from graphgen.utils import logger, run_concurrent
class GenerateService(BaseOperator):
"""
Generate question-answer pairs based on nodes and edges.
"""
def __init__(
self,
working_dir: str = "cache",
method: str = "aggregated",
data_format: str = "ChatML",
):
super().__init__(working_dir=working_dir, op_name="generate_service")
self.llm_client: BaseLLMWrapper = init_llm("synthesizer")
self.method = method
self.data_format = data_format
if self.method == "atomic":
self.generator = AtomicGenerator(self.llm_client)
elif self.method == "aggregated":
self.generator = AggregatedGenerator(self.llm_client)
elif self.method == "multi_hop":
self.generator = MultiHopGenerator(self.llm_client)
elif self.method == "cot":
self.generator = CoTGenerator(self.llm_client)
elif self.method in ["vqa"]:
self.generator = VQAGenerator(self.llm_client)
else:
raise ValueError(f"Unsupported generation mode: {method}")
def process(self, batch: pd.DataFrame) -> pd.DataFrame:
items = batch.to_dict(orient="records")
return pd.DataFrame(self.generate(items))
def generate(self, items: list[dict]) -> list[dict]:
"""
Generate question-answer pairs based on nodes and edges.
:param items
:return: QA pairs
"""
logger.info("[Generation] mode: %s, batches: %d", self.method, len(items))
items = [(item["nodes"], item["edges"]) for item in items]
results = run_concurrent(
self.generator.generate,
items,
desc="[4/4]Generating QAs",
unit="batch",
)
results = self.generator.format_generation_results(
results, output_data_format=self.data_format
)
return results