|
|
""" |
|
|
OpenLineage Exporter - Export to OpenLineage standard format. |
|
|
|
|
|
OpenLineage is an open standard for metadata and lineage collection. |
|
|
https://openlineage.io/ |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
|
|
|
|
class OpenLineageExporter(LineageExporter): |
|
|
"""Export lineage to OpenLineage format.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph, namespace: str = "lineage-accelerator"): |
|
|
super().__init__(graph) |
|
|
self.namespace = namespace |
|
|
|
|
|
@property |
|
|
def format_name(self) -> str: |
|
|
return "OpenLineage" |
|
|
|
|
|
@property |
|
|
def file_extension(self) -> str: |
|
|
return ".json" |
|
|
|
|
|
def _create_dataset(self, node: LineageNode) -> Dict[str, Any]: |
|
|
"""Create an OpenLineage dataset from a node.""" |
|
|
dataset = { |
|
|
"namespace": self.namespace, |
|
|
"name": self._get_qualified_name(node), |
|
|
"facets": {} |
|
|
} |
|
|
|
|
|
|
|
|
if node.columns: |
|
|
dataset["facets"]["schema"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json", |
|
|
"fields": [ |
|
|
{ |
|
|
"name": col.get("name"), |
|
|
"type": col.get("type") or col.get("data_type", "string"), |
|
|
"description": col.get("description") |
|
|
} |
|
|
for col in node.columns |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
if node.description: |
|
|
dataset["facets"]["documentation"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationDatasetFacet.json", |
|
|
"description": node.description |
|
|
} |
|
|
|
|
|
|
|
|
if node.owner: |
|
|
dataset["facets"]["ownership"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json", |
|
|
"owners": [{"name": node.owner, "type": "MAINTAINER"}] |
|
|
} |
|
|
|
|
|
|
|
|
custom_facet = {} |
|
|
if node.type: |
|
|
custom_facet["nodeType"] = node.type |
|
|
if node.category: |
|
|
custom_facet["category"] = node.category |
|
|
if node.tags: |
|
|
custom_facet["tags"] = node.tags |
|
|
if node.metadata: |
|
|
custom_facet.update(node.metadata) |
|
|
|
|
|
if custom_facet: |
|
|
dataset["facets"]["custom"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/CustomFacet", |
|
|
**custom_facet |
|
|
} |
|
|
|
|
|
return dataset |
|
|
|
|
|
def _get_qualified_name(self, node: LineageNode) -> str: |
|
|
"""Get fully qualified name for a node.""" |
|
|
parts = [] |
|
|
if node.database: |
|
|
parts.append(node.database) |
|
|
if node.schema: |
|
|
parts.append(node.schema) |
|
|
parts.append(node.name) |
|
|
return ".".join(parts) |
|
|
|
|
|
def _create_job(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create an OpenLineage job from an edge.""" |
|
|
job_name = edge.job_name or f"transform_{edge.source}_to_{edge.target}" |
|
|
|
|
|
job = { |
|
|
"namespace": self.namespace, |
|
|
"name": job_name, |
|
|
"facets": {} |
|
|
} |
|
|
|
|
|
|
|
|
if edge.type: |
|
|
job["facets"]["jobType"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/JobTypeJobFacet.json", |
|
|
"processingType": "BATCH", |
|
|
"integration": "CUSTOM", |
|
|
"jobType": edge.type.upper() |
|
|
} |
|
|
|
|
|
return job |
|
|
|
|
|
def _create_run_event(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create an OpenLineage run event for an edge.""" |
|
|
source_node = self.graph.get_node(edge.source) |
|
|
target_node = self.graph.get_node(edge.target) |
|
|
|
|
|
event = { |
|
|
"eventType": "COMPLETE", |
|
|
"eventTime": self.graph.generated_at, |
|
|
"run": { |
|
|
"runId": str(uuid.uuid4()), |
|
|
"facets": {} |
|
|
}, |
|
|
"job": self._create_job(edge), |
|
|
"inputs": [], |
|
|
"outputs": [] |
|
|
} |
|
|
|
|
|
if source_node: |
|
|
event["inputs"].append(self._create_dataset(source_node)) |
|
|
|
|
|
if target_node: |
|
|
output_dataset = self._create_dataset(target_node) |
|
|
|
|
|
if source_node: |
|
|
output_dataset["facets"]["columnLineage"] = { |
|
|
"_producer": "lineage-accelerator", |
|
|
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ColumnLineageDatasetFacet.json", |
|
|
"fields": {} |
|
|
} |
|
|
event["outputs"].append(output_dataset) |
|
|
|
|
|
return event |
|
|
|
|
|
def export(self) -> str: |
|
|
"""Export to OpenLineage JSON format.""" |
|
|
return self.to_json(indent=2) |
|
|
|
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to dictionary.""" |
|
|
|
|
|
events = [self._create_run_event(edge) for edge in self.graph.edges] |
|
|
|
|
|
|
|
|
return { |
|
|
"producer": "lineage-accelerator", |
|
|
"schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json", |
|
|
"generatedAt": self.graph.generated_at, |
|
|
"lineageName": self.graph.name, |
|
|
"namespace": self.namespace, |
|
|
"events": events, |
|
|
"datasets": [self._create_dataset(node) for node in self.graph.nodes], |
|
|
"summary": { |
|
|
"totalNodes": len(self.graph.nodes), |
|
|
"totalEdges": len(self.graph.edges), |
|
|
"nodeTypes": list(set(n.type for n in self.graph.nodes)), |
|
|
"edgeTypes": list(set(e.type for e in self.graph.edges)) |
|
|
} |
|
|
} |
|
|
|