File size: 7,617 Bytes
0510038 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
"""
Microsoft Purview Exporter - Export to Microsoft Purview format.
Microsoft Purview is a unified data governance service.
https://azure.microsoft.com/en-us/products/purview
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class PurviewExporter(LineageExporter):
"""Export lineage to Microsoft Purview format."""
def __init__(self, graph: LineageGraph, collection_name: str = "lineage-accelerator"):
super().__init__(graph)
self.collection_name = collection_name
@property
def format_name(self) -> str:
return "Microsoft Purview"
@property
def file_extension(self) -> str:
return ".json"
def _node_type_to_purview_type(self, node_type: str) -> str:
"""Map internal node types to Purview entity types."""
type_mapping = {
"table": "azure_sql_table",
"view": "azure_sql_view",
"model": "DataSet",
"source": "DataSource",
"destination": "DataSet",
"column": "azure_sql_column",
"database": "azure_sql_db",
"schema": "azure_sql_schema",
"report": "PowerBI_Report",
"dimension": "azure_sql_table",
"fact": "azure_sql_table",
"feature_set": "DataSet",
"semantic_model": "PowerBI_Dataset",
"external_api": "DataSource",
"extract": "DataSet"
}
return type_mapping.get(node_type.lower(), "DataSet")
def _create_entity(self, node: LineageNode) -> Dict[str, Any]:
"""Create a Purview entity from a node."""
qualified_name = self._get_qualified_name(node)
entity = {
"typeName": self._node_type_to_purview_type(node.type),
"attributes": {
"name": node.name,
"qualifiedName": qualified_name,
"description": node.description or f"Data asset: {node.name}"
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, qualified_name)),
"status": "ACTIVE"
}
# Add database-specific attributes
if node.database:
entity["attributes"]["databaseName"] = node.database
if node.schema:
entity["attributes"]["schemaName"] = node.schema
# Add owner
if node.owner:
entity["attributes"]["owner"] = node.owner
# Add custom attributes
entity["attributes"]["sourceSystem"] = "lineage-accelerator"
if node.category:
entity["attributes"]["layer"] = node.category
if node.tags:
entity["attributes"]["userTags"] = node.tags
return entity
def _get_qualified_name(self, node: LineageNode) -> str:
"""Get Purview-style qualified name."""
parts = [self.collection_name]
if node.database:
parts.append(node.database)
if node.schema:
parts.append(node.schema)
parts.append(node.name)
return "://".join(parts[:1]) + "/" + "/".join(parts[1:])
def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]:
"""Create Purview column entities from a node's columns."""
if not node.columns:
return []
column_entities = []
parent_qualified_name = self._get_qualified_name(node)
for col in node.columns:
col_qualified_name = f"{parent_qualified_name}#{col.get('name')}"
column_entity = {
"typeName": "azure_sql_column",
"attributes": {
"name": col.get("name"),
"qualifiedName": col_qualified_name,
"data_type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description", "")
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, col_qualified_name)),
"status": "ACTIVE",
"relationshipAttributes": {
"table": {
"typeName": self._node_type_to_purview_type(node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, parent_qualified_name))
}
}
}
column_entities.append(column_entity)
return column_entities
def _create_process(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create a Purview process entity for lineage."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}"
process_qualified_name = f"{self.collection_name}://processes/{process_name}"
process = {
"typeName": "Process",
"attributes": {
"name": process_name,
"qualifiedName": process_qualified_name,
"description": f"Data transformation: {edge.type}"
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, process_qualified_name)),
"status": "ACTIVE",
"relationshipAttributes": {
"inputs": [],
"outputs": []
}
}
# Add input reference
if source_node:
source_qualified_name = self._get_qualified_name(source_node)
process["relationshipAttributes"]["inputs"].append({
"typeName": self._node_type_to_purview_type(source_node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, source_qualified_name)),
"qualifiedName": source_qualified_name
})
# Add output reference
if target_node:
target_qualified_name = self._get_qualified_name(target_node)
process["relationshipAttributes"]["outputs"].append({
"typeName": self._node_type_to_purview_type(target_node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, target_qualified_name)),
"qualifiedName": target_qualified_name
})
return process
def export(self) -> str:
"""Export to Microsoft Purview JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to Purview bulk import dictionary."""
# Collect all entities
entities = []
# Add node entities
for node in self.graph.nodes:
entities.append(self._create_entity(node))
# Add column entities
entities.extend(self._create_column_entities(node))
# Add process entities for lineage
processes = [self._create_process(edge) for edge in self.graph.edges]
return {
"exportInfo": {
"producer": "Lineage Graph Accelerator",
"exportedAt": self.graph.generated_at,
"sourceLineageName": self.graph.name,
"format": "Microsoft Purview Bulk Import",
"version": "1.0"
},
"collection": {
"referenceName": self.collection_name,
"type": "CollectionReference"
},
"entities": entities,
"processes": processes,
"referredEntities": {},
"summary": {
"totalEntities": len(entities),
"totalProcesses": len(processes),
"entityTypes": list(set(e["typeName"] for e in entities))
}
}
|