File size: 6,205 Bytes
0510038 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
"""
OpenLineage Exporter - Export to OpenLineage standard format.
OpenLineage is an open standard for metadata and lineage collection.
https://openlineage.io/
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class OpenLineageExporter(LineageExporter):
"""Export lineage to OpenLineage format."""
def __init__(self, graph: LineageGraph, namespace: str = "lineage-accelerator"):
super().__init__(graph)
self.namespace = namespace
@property
def format_name(self) -> str:
return "OpenLineage"
@property
def file_extension(self) -> str:
return ".json"
def _create_dataset(self, node: LineageNode) -> Dict[str, Any]:
"""Create an OpenLineage dataset from a node."""
dataset = {
"namespace": self.namespace,
"name": self._get_qualified_name(node),
"facets": {}
}
# Add schema facet if columns are present
if node.columns:
dataset["facets"]["schema"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SchemaDatasetFacet.json",
"fields": [
{
"name": col.get("name"),
"type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description")
}
for col in node.columns
]
}
# Add documentation facet
if node.description:
dataset["facets"]["documentation"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationDatasetFacet.json",
"description": node.description
}
# Add ownership facet
if node.owner:
dataset["facets"]["ownership"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/OwnershipDatasetFacet.json",
"owners": [{"name": node.owner, "type": "MAINTAINER"}]
}
# Add custom facet for additional metadata
custom_facet = {}
if node.type:
custom_facet["nodeType"] = node.type
if node.category:
custom_facet["category"] = node.category
if node.tags:
custom_facet["tags"] = node.tags
if node.metadata:
custom_facet.update(node.metadata)
if custom_facet:
dataset["facets"]["custom"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/CustomFacet",
**custom_facet
}
return dataset
def _get_qualified_name(self, node: LineageNode) -> str:
"""Get fully qualified name for a node."""
parts = []
if node.database:
parts.append(node.database)
if node.schema:
parts.append(node.schema)
parts.append(node.name)
return ".".join(parts)
def _create_job(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an OpenLineage job from an edge."""
job_name = edge.job_name or f"transform_{edge.source}_to_{edge.target}"
job = {
"namespace": self.namespace,
"name": job_name,
"facets": {}
}
# Add job type facet
if edge.type:
job["facets"]["jobType"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/JobTypeJobFacet.json",
"processingType": "BATCH",
"integration": "CUSTOM",
"jobType": edge.type.upper()
}
return job
def _create_run_event(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an OpenLineage run event for an edge."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
event = {
"eventType": "COMPLETE",
"eventTime": self.graph.generated_at,
"run": {
"runId": str(uuid.uuid4()),
"facets": {}
},
"job": self._create_job(edge),
"inputs": [],
"outputs": []
}
if source_node:
event["inputs"].append(self._create_dataset(source_node))
if target_node:
output_dataset = self._create_dataset(target_node)
# Add lineage facet to output
if source_node:
output_dataset["facets"]["columnLineage"] = {
"_producer": "lineage-accelerator",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/ColumnLineageDatasetFacet.json",
"fields": {}
}
event["outputs"].append(output_dataset)
return event
def export(self) -> str:
"""Export to OpenLineage JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
# Create run events for each edge
events = [self._create_run_event(edge) for edge in self.graph.edges]
# Create a summary structure
return {
"producer": "lineage-accelerator",
"schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json",
"generatedAt": self.graph.generated_at,
"lineageName": self.graph.name,
"namespace": self.namespace,
"events": events,
"datasets": [self._create_dataset(node) for node in self.graph.nodes],
"summary": {
"totalNodes": len(self.graph.nodes),
"totalEdges": len(self.graph.edges),
"nodeTypes": list(set(n.type for n in self.graph.nodes)),
"edgeTypes": list(set(e.type for e in self.graph.edges))
}
}
|