""" Microsoft Purview Exporter - Export to Microsoft Purview format. Microsoft Purview is a unified data governance service. https://azure.microsoft.com/en-us/products/purview """ from typing import Dict, Any, List from datetime import datetime import uuid from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge class PurviewExporter(LineageExporter): """Export lineage to Microsoft Purview format.""" def __init__(self, graph: LineageGraph, collection_name: str = "lineage-accelerator"): super().__init__(graph) self.collection_name = collection_name @property def format_name(self) -> str: return "Microsoft Purview" @property def file_extension(self) -> str: return ".json" def _node_type_to_purview_type(self, node_type: str) -> str: """Map internal node types to Purview entity types.""" type_mapping = { "table": "azure_sql_table", "view": "azure_sql_view", "model": "DataSet", "source": "DataSource", "destination": "DataSet", "column": "azure_sql_column", "database": "azure_sql_db", "schema": "azure_sql_schema", "report": "PowerBI_Report", "dimension": "azure_sql_table", "fact": "azure_sql_table", "feature_set": "DataSet", "semantic_model": "PowerBI_Dataset", "external_api": "DataSource", "extract": "DataSet" } return type_mapping.get(node_type.lower(), "DataSet") def _create_entity(self, node: LineageNode) -> Dict[str, Any]: """Create a Purview entity from a node.""" qualified_name = self._get_qualified_name(node) entity = { "typeName": self._node_type_to_purview_type(node.type), "attributes": { "name": node.name, "qualifiedName": qualified_name, "description": node.description or f"Data asset: {node.name}" }, "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, qualified_name)), "status": "ACTIVE" } # Add database-specific attributes if node.database: entity["attributes"]["databaseName"] = node.database if node.schema: entity["attributes"]["schemaName"] = node.schema # Add owner if node.owner: entity["attributes"]["owner"] = node.owner # Add custom attributes entity["attributes"]["sourceSystem"] = "lineage-accelerator" if node.category: entity["attributes"]["layer"] = node.category if node.tags: entity["attributes"]["userTags"] = node.tags return entity def _get_qualified_name(self, node: LineageNode) -> str: """Get Purview-style qualified name.""" parts = [self.collection_name] if node.database: parts.append(node.database) if node.schema: parts.append(node.schema) parts.append(node.name) return "://".join(parts[:1]) + "/" + "/".join(parts[1:]) def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]: """Create Purview column entities from a node's columns.""" if not node.columns: return [] column_entities = [] parent_qualified_name = self._get_qualified_name(node) for col in node.columns: col_qualified_name = f"{parent_qualified_name}#{col.get('name')}" column_entity = { "typeName": "azure_sql_column", "attributes": { "name": col.get("name"), "qualifiedName": col_qualified_name, "data_type": col.get("type") or col.get("data_type", "string"), "description": col.get("description", "") }, "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, col_qualified_name)), "status": "ACTIVE", "relationshipAttributes": { "table": { "typeName": self._node_type_to_purview_type(node.type), "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, parent_qualified_name)) } } } column_entities.append(column_entity) return column_entities def _create_process(self, edge: LineageEdge) -> Dict[str, Any]: """Create a Purview process entity for lineage.""" source_node = self.graph.get_node(edge.source) target_node = self.graph.get_node(edge.target) process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}" process_qualified_name = f"{self.collection_name}://processes/{process_name}" process = { "typeName": "Process", "attributes": { "name": process_name, "qualifiedName": process_qualified_name, "description": f"Data transformation: {edge.type}" }, "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, process_qualified_name)), "status": "ACTIVE", "relationshipAttributes": { "inputs": [], "outputs": [] } } # Add input reference if source_node: source_qualified_name = self._get_qualified_name(source_node) process["relationshipAttributes"]["inputs"].append({ "typeName": self._node_type_to_purview_type(source_node.type), "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, source_qualified_name)), "qualifiedName": source_qualified_name }) # Add output reference if target_node: target_qualified_name = self._get_qualified_name(target_node) process["relationshipAttributes"]["outputs"].append({ "typeName": self._node_type_to_purview_type(target_node.type), "guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, target_qualified_name)), "qualifiedName": target_qualified_name }) return process def export(self) -> str: """Export to Microsoft Purview JSON format.""" return self.to_json(indent=2) def _to_dict(self) -> Dict[str, Any]: """Convert to Purview bulk import dictionary.""" # Collect all entities entities = [] # Add node entities for node in self.graph.nodes: entities.append(self._create_entity(node)) # Add column entities entities.extend(self._create_column_entities(node)) # Add process entities for lineage processes = [self._create_process(edge) for edge in self.graph.edges] return { "exportInfo": { "producer": "Lineage Graph Accelerator", "exportedAt": self.graph.generated_at, "sourceLineageName": self.graph.name, "format": "Microsoft Purview Bulk Import", "version": "1.0" }, "collection": { "referenceName": self.collection_name, "type": "CollectionReference" }, "entities": entities, "processes": processes, "referredEntities": {}, "summary": { "totalEntities": len(entities), "totalProcesses": len(processes), "entityTypes": list(set(e["typeName"] for e in entities)) } }