aamanlamba's picture
Phase 2: Enhanced lineage extraction with export to data catalogs
0510038
"""
OpenLineage Exporter - Export to OpenLineage standard format.
OpenLineage is an open standard for metadata and lineage collection.
https://openlineage.io/
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class OpenLineageExporter(LineageExporter):
"""Export lineage to OpenLineage format."""
def __init__(self, graph: LineageGraph, namespace: str = "lineage-accelerator"):
super().__init__(graph)
self.namespace = namespace
@property
def format_name(self) -> str:
return "OpenLineage"
@property
def file_extension(self) -> str:
return ".json"
def _create_dataset(self, node: LineageNode) -> Dict[str, Any]:
"""Create an OpenLineage dataset from a node."""
dataset = {
"namespace": self.namespace,
"name": self._get_qualified_name(node),
"facets": {}
}
# Add schema facet if columns are present
if node.columns:
dataset["facets"]["schema"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
"fields": [
{
"name": col.get("name"),
"type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description")
}
for col in node.columns
]
}
# Add documentation facet
if node.description:
dataset["facets"]["documentation"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationDatasetFacet.json",
"description": node.description
}
# Add ownership facet
if node.owner:
dataset["facets"]["ownership"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json",
"owners": [{"name": node.owner, "type": "MAINTAINER"}]
}
# Add custom facet for additional metadata
custom_facet = {}
if node.type:
custom_facet["nodeType"] = node.type
if node.category:
custom_facet["category"] = node.category
if node.tags:
custom_facet["tags"] = node.tags
if node.metadata:
custom_facet.update(node.metadata)
if custom_facet:
dataset["facets"]["custom"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/CustomFacet",
**custom_facet
}
return dataset
def _get_qualified_name(self, node: LineageNode) -> str:
"""Get fully qualified name for a node."""
parts = []
if node.database:
parts.append(node.database)
if node.schema:
parts.append(node.schema)
parts.append(node.name)
return ".".join(parts)
def _create_job(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an OpenLineage job from an edge."""
job_name = edge.job_name or f"transform_{edge.source}_to_{edge.target}"
job = {
"namespace": self.namespace,
"name": job_name,
"facets": {}
}
# Add job type facet
if edge.type:
job["facets"]["jobType"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "CUSTOM",
"jobType": edge.type.upper()
}
return job
def _create_run_event(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an OpenLineage run event for an edge."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
event = {
"eventType": "COMPLETE",
"eventTime": self.graph.generated_at,
"run": {
"runId": str(uuid.uuid4()),
"facets": {}
},
"job": self._create_job(edge),
"inputs": [],
"outputs": []
}
if source_node:
event["inputs"].append(self._create_dataset(source_node))
if target_node:
output_dataset = self._create_dataset(target_node)
# Add lineage facet to output
if source_node:
output_dataset["facets"]["columnLineage"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ColumnLineageDatasetFacet.json",
"fields": {}
}
event["outputs"].append(output_dataset)
return event
def export(self) -> str:
"""Export to OpenLineage JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
# Create run events for each edge
events = [self._create_run_event(edge) for edge in self.graph.edges]
# Create a summary structure
return {
"producer": "lineage-accelerator",
"schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json",
"generatedAt": self.graph.generated_at,
"lineageName": self.graph.name,
"namespace": self.namespace,
"events": events,
"datasets": [self._create_dataset(node) for node in self.graph.nodes],
"summary": {
"totalNodes": len(self.graph.nodes),
"totalEdges": len(self.graph.edges),
"nodeTypes": list(set(n.type for n in self.graph.nodes)),
"edgeTypes": list(set(e.type for e in self.graph.edges))
}
}