""" OpenLineage Exporter - Export to OpenLineage standard format. OpenLineage is an open standard for metadata and lineage collection. https://openlineage.io/ """ from typing import Dict, Any, List from datetime import datetime import uuid from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge class OpenLineageExporter(LineageExporter): """Export lineage to OpenLineage format.""" def __init__(self, graph: LineageGraph, namespace: str = "lineage-accelerator"): super().__init__(graph) self.namespace = namespace @property def format_name(self) -> str: return "OpenLineage" @property def file_extension(self) -> str: return ".json" def _create_dataset(self, node: LineageNode) -> Dict[str, Any]: """Create an OpenLineage dataset from a node.""" dataset = { "namespace": self.namespace, "name": self._get_qualified_name(node), "facets": {} } # Add schema facet if columns are present if node.columns: dataset["facets"]["schema"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", "fields": [ { "name": col.get("name"), "type": col.get("type") or col.get("data_type", "string"), "description": col.get("description") } for col in node.columns ] } # Add documentation facet if node.description: dataset["facets"]["documentation"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationDatasetFacet.json", "description": node.description } # Add ownership facet if node.owner: dataset["facets"]["ownership"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json", "owners": [{"name": node.owner, "type": "MAINTAINER"}] } # Add custom facet for additional metadata custom_facet = {} if node.type: custom_facet["nodeType"] = node.type if node.category: custom_facet["category"] = node.category if node.tags: custom_facet["tags"] = node.tags if node.metadata: custom_facet.update(node.metadata) if custom_facet: dataset["facets"]["custom"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/CustomFacet", **custom_facet } return dataset def _get_qualified_name(self, node: LineageNode) -> str: """Get fully qualified name for a node.""" parts = [] if node.database: parts.append(node.database) if node.schema: parts.append(node.schema) parts.append(node.name) return ".".join(parts) def _create_job(self, edge: LineageEdge) -> Dict[str, Any]: """Create an OpenLineage job from an edge.""" job_name = edge.job_name or f"transform_{edge.source}_to_{edge.target}" job = { "namespace": self.namespace, "name": job_name, "facets": {} } # Add job type facet if edge.type: job["facets"]["jobType"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/JobTypeJobFacet.json", "processingType": "BATCH", "integration": "CUSTOM", "jobType": edge.type.upper() } return job def _create_run_event(self, edge: LineageEdge) -> Dict[str, Any]: """Create an OpenLineage run event for an edge.""" source_node = self.graph.get_node(edge.source) target_node = self.graph.get_node(edge.target) event = { "eventType": "COMPLETE", "eventTime": self.graph.generated_at, "run": { "runId": str(uuid.uuid4()), "facets": {} }, "job": self._create_job(edge), "inputs": [], "outputs": [] } if source_node: event["inputs"].append(self._create_dataset(source_node)) if target_node: output_dataset = self._create_dataset(target_node) # Add lineage facet to output if source_node: output_dataset["facets"]["columnLineage"] = { "_producer": "lineage-accelerator", "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ColumnLineageDatasetFacet.json", "fields": {} } event["outputs"].append(output_dataset) return event def export(self) -> str: """Export to OpenLineage JSON format.""" return self.to_json(indent=2) def _to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" # Create run events for each edge events = [self._create_run_event(edge) for edge in self.graph.edges] # Create a summary structure return { "producer": "lineage-accelerator", "schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json", "generatedAt": self.graph.generated_at, "lineageName": self.graph.name, "namespace": self.namespace, "events": events, "datasets": [self._create_dataset(node) for node in self.graph.nodes], "summary": { "totalNodes": len(self.graph.nodes), "totalEdges": len(self.graph.edges), "nodeTypes": list(set(n.type for n in self.graph.nodes)), "edgeTypes": list(set(e.type for e in self.graph.edges)) } }