|
|
""" |
|
|
Microsoft Purview Exporter - Export to Microsoft Purview format. |
|
|
|
|
|
Microsoft Purview is a unified data governance service. |
|
|
https://azure.microsoft.com/en-us/products/purview |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
|
|
|
|
class PurviewExporter(LineageExporter): |
|
|
"""Export lineage to Microsoft Purview format.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph, collection_name: str = "lineage-accelerator"): |
|
|
super().__init__(graph) |
|
|
self.collection_name = collection_name |
|
|
|
|
|
@property |
|
|
def format_name(self) -> str: |
|
|
return "Microsoft Purview" |
|
|
|
|
|
@property |
|
|
def file_extension(self) -> str: |
|
|
return ".json" |
|
|
|
|
|
def _node_type_to_purview_type(self, node_type: str) -> str: |
|
|
"""Map internal node types to Purview entity types.""" |
|
|
type_mapping = { |
|
|
"table": "azure_sql_table", |
|
|
"view": "azure_sql_view", |
|
|
"model": "DataSet", |
|
|
"source": "DataSource", |
|
|
"destination": "DataSet", |
|
|
"column": "azure_sql_column", |
|
|
"database": "azure_sql_db", |
|
|
"schema": "azure_sql_schema", |
|
|
"report": "PowerBI_Report", |
|
|
"dimension": "azure_sql_table", |
|
|
"fact": "azure_sql_table", |
|
|
"feature_set": "DataSet", |
|
|
"semantic_model": "PowerBI_Dataset", |
|
|
"external_api": "DataSource", |
|
|
"extract": "DataSet" |
|
|
} |
|
|
return type_mapping.get(node_type.lower(), "DataSet") |
|
|
|
|
|
def _create_entity(self, node: LineageNode) -> Dict[str, Any]: |
|
|
"""Create a Purview entity from a node.""" |
|
|
qualified_name = self._get_qualified_name(node) |
|
|
|
|
|
entity = { |
|
|
"typeName": self._node_type_to_purview_type(node.type), |
|
|
"attributes": { |
|
|
"name": node.name, |
|
|
"qualifiedName": qualified_name, |
|
|
"description": node.description or f"Data asset: {node.name}" |
|
|
}, |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, qualified_name)), |
|
|
"status": "ACTIVE" |
|
|
} |
|
|
|
|
|
|
|
|
if node.database: |
|
|
entity["attributes"]["databaseName"] = node.database |
|
|
if node.schema: |
|
|
entity["attributes"]["schemaName"] = node.schema |
|
|
|
|
|
|
|
|
if node.owner: |
|
|
entity["attributes"]["owner"] = node.owner |
|
|
|
|
|
|
|
|
entity["attributes"]["sourceSystem"] = "lineage-accelerator" |
|
|
if node.category: |
|
|
entity["attributes"]["layer"] = node.category |
|
|
if node.tags: |
|
|
entity["attributes"]["userTags"] = node.tags |
|
|
|
|
|
return entity |
|
|
|
|
|
def _get_qualified_name(self, node: LineageNode) -> str: |
|
|
"""Get Purview-style qualified name.""" |
|
|
parts = [self.collection_name] |
|
|
if node.database: |
|
|
parts.append(node.database) |
|
|
if node.schema: |
|
|
parts.append(node.schema) |
|
|
parts.append(node.name) |
|
|
return "://".join(parts[:1]) + "/" + "/".join(parts[1:]) |
|
|
|
|
|
def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]: |
|
|
"""Create Purview column entities from a node's columns.""" |
|
|
if not node.columns: |
|
|
return [] |
|
|
|
|
|
column_entities = [] |
|
|
parent_qualified_name = self._get_qualified_name(node) |
|
|
|
|
|
for col in node.columns: |
|
|
col_qualified_name = f"{parent_qualified_name}#{col.get('name')}" |
|
|
column_entity = { |
|
|
"typeName": "azure_sql_column", |
|
|
"attributes": { |
|
|
"name": col.get("name"), |
|
|
"qualifiedName": col_qualified_name, |
|
|
"data_type": col.get("type") or col.get("data_type", "string"), |
|
|
"description": col.get("description", "") |
|
|
}, |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, col_qualified_name)), |
|
|
"status": "ACTIVE", |
|
|
"relationshipAttributes": { |
|
|
"table": { |
|
|
"typeName": self._node_type_to_purview_type(node.type), |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, parent_qualified_name)) |
|
|
} |
|
|
} |
|
|
} |
|
|
column_entities.append(column_entity) |
|
|
|
|
|
return column_entities |
|
|
|
|
|
def _create_process(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create a Purview process entity for lineage.""" |
|
|
source_node = self.graph.get_node(edge.source) |
|
|
target_node = self.graph.get_node(edge.target) |
|
|
|
|
|
process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}" |
|
|
process_qualified_name = f"{self.collection_name}://processes/{process_name}" |
|
|
|
|
|
process = { |
|
|
"typeName": "Process", |
|
|
"attributes": { |
|
|
"name": process_name, |
|
|
"qualifiedName": process_qualified_name, |
|
|
"description": f"Data transformation: {edge.type}" |
|
|
}, |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, process_qualified_name)), |
|
|
"status": "ACTIVE", |
|
|
"relationshipAttributes": { |
|
|
"inputs": [], |
|
|
"outputs": [] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if source_node: |
|
|
source_qualified_name = self._get_qualified_name(source_node) |
|
|
process["relationshipAttributes"]["inputs"].append({ |
|
|
"typeName": self._node_type_to_purview_type(source_node.type), |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, source_qualified_name)), |
|
|
"qualifiedName": source_qualified_name |
|
|
}) |
|
|
|
|
|
|
|
|
if target_node: |
|
|
target_qualified_name = self._get_qualified_name(target_node) |
|
|
process["relationshipAttributes"]["outputs"].append({ |
|
|
"typeName": self._node_type_to_purview_type(target_node.type), |
|
|
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, target_qualified_name)), |
|
|
"qualifiedName": target_qualified_name |
|
|
}) |
|
|
|
|
|
return process |
|
|
|
|
|
def export(self) -> str: |
|
|
"""Export to Microsoft Purview JSON format.""" |
|
|
return self.to_json(indent=2) |
|
|
|
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to Purview bulk import dictionary.""" |
|
|
|
|
|
entities = [] |
|
|
|
|
|
|
|
|
for node in self.graph.nodes: |
|
|
entities.append(self._create_entity(node)) |
|
|
|
|
|
entities.extend(self._create_column_entities(node)) |
|
|
|
|
|
|
|
|
processes = [self._create_process(edge) for edge in self.graph.edges] |
|
|
|
|
|
return { |
|
|
"exportInfo": { |
|
|
"producer": "Lineage Graph Accelerator", |
|
|
"exportedAt": self.graph.generated_at, |
|
|
"sourceLineageName": self.graph.name, |
|
|
"format": "Microsoft Purview Bulk Import", |
|
|
"version": "1.0" |
|
|
}, |
|
|
"collection": { |
|
|
"referenceName": self.collection_name, |
|
|
"type": "CollectionReference" |
|
|
}, |
|
|
"entities": entities, |
|
|
"processes": processes, |
|
|
"referredEntities": {}, |
|
|
"summary": { |
|
|
"totalEntities": len(entities), |
|
|
"totalProcesses": len(processes), |
|
|
"entityTypes": list(set(e["typeName"] for e in entities)) |
|
|
} |
|
|
} |
|
|
|