aamanlamba's picture
Phase 2: Enhanced lineage extraction with export to data catalogs
0510038
"""
Microsoft Purview Exporter - Export to Microsoft Purview format.
Microsoft Purview is a unified data governance service.
https://azure.microsoft.com/en-us/products/purview
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class PurviewExporter(LineageExporter):
"""Export lineage to Microsoft Purview format."""
def __init__(self, graph: LineageGraph, collection_name: str = "lineage-accelerator"):
super().__init__(graph)
self.collection_name = collection_name
@property
def format_name(self) -> str:
return "Microsoft Purview"
@property
def file_extension(self) -> str:
return ".json"
def _node_type_to_purview_type(self, node_type: str) -> str:
"""Map internal node types to Purview entity types."""
type_mapping = {
"table": "azure_sql_table",
"view": "azure_sql_view",
"model": "DataSet",
"source": "DataSource",
"destination": "DataSet",
"column": "azure_sql_column",
"database": "azure_sql_db",
"schema": "azure_sql_schema",
"report": "PowerBI_Report",
"dimension": "azure_sql_table",
"fact": "azure_sql_table",
"feature_set": "DataSet",
"semantic_model": "PowerBI_Dataset",
"external_api": "DataSource",
"extract": "DataSet"
}
return type_mapping.get(node_type.lower(), "DataSet")
def _create_entity(self, node: LineageNode) -> Dict[str, Any]:
"""Create a Purview entity from a node."""
qualified_name = self._get_qualified_name(node)
entity = {
"typeName": self._node_type_to_purview_type(node.type),
"attributes": {
"name": node.name,
"qualifiedName": qualified_name,
"description": node.description or f"Data asset: {node.name}"
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, qualified_name)),
"status": "ACTIVE"
}
# Add database-specific attributes
if node.database:
entity["attributes"]["databaseName"] = node.database
if node.schema:
entity["attributes"]["schemaName"] = node.schema
# Add owner
if node.owner:
entity["attributes"]["owner"] = node.owner
# Add custom attributes
entity["attributes"]["sourceSystem"] = "lineage-accelerator"
if node.category:
entity["attributes"]["layer"] = node.category
if node.tags:
entity["attributes"]["userTags"] = node.tags
return entity
def _get_qualified_name(self, node: LineageNode) -> str:
"""Get Purview-style qualified name."""
parts = [self.collection_name]
if node.database:
parts.append(node.database)
if node.schema:
parts.append(node.schema)
parts.append(node.name)
return "://".join(parts[:1]) + "/" + "/".join(parts[1:])
def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]:
"""Create Purview column entities from a node's columns."""
if not node.columns:
return []
column_entities = []
parent_qualified_name = self._get_qualified_name(node)
for col in node.columns:
col_qualified_name = f"{parent_qualified_name}#{col.get('name')}"
column_entity = {
"typeName": "azure_sql_column",
"attributes": {
"name": col.get("name"),
"qualifiedName": col_qualified_name,
"data_type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description", "")
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, col_qualified_name)),
"status": "ACTIVE",
"relationshipAttributes": {
"table": {
"typeName": self._node_type_to_purview_type(node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, parent_qualified_name))
}
}
}
column_entities.append(column_entity)
return column_entities
def _create_process(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create a Purview process entity for lineage."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}"
process_qualified_name = f"{self.collection_name}://processes/{process_name}"
process = {
"typeName": "Process",
"attributes": {
"name": process_name,
"qualifiedName": process_qualified_name,
"description": f"Data transformation: {edge.type}"
},
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, process_qualified_name)),
"status": "ACTIVE",
"relationshipAttributes": {
"inputs": [],
"outputs": []
}
}
# Add input reference
if source_node:
source_qualified_name = self._get_qualified_name(source_node)
process["relationshipAttributes"]["inputs"].append({
"typeName": self._node_type_to_purview_type(source_node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, source_qualified_name)),
"qualifiedName": source_qualified_name
})
# Add output reference
if target_node:
target_qualified_name = self._get_qualified_name(target_node)
process["relationshipAttributes"]["outputs"].append({
"typeName": self._node_type_to_purview_type(target_node.type),
"guid": str(uuid.uuid5(uuid.NAMESPACE_DNS, target_qualified_name)),
"qualifiedName": target_qualified_name
})
return process
def export(self) -> str:
"""Export to Microsoft Purview JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to Purview bulk import dictionary."""
# Collect all entities
entities = []
# Add node entities
for node in self.graph.nodes:
entities.append(self._create_entity(node))
# Add column entities
entities.extend(self._create_column_entities(node))
# Add process entities for lineage
processes = [self._create_process(edge) for edge in self.graph.edges]
return {
"exportInfo": {
"producer": "Lineage Graph Accelerator",
"exportedAt": self.graph.generated_at,
"sourceLineageName": self.graph.name,
"format": "Microsoft Purview Bulk Import",
"version": "1.0"
},
"collection": {
"referenceName": self.collection_name,
"type": "CollectionReference"
},
"entities": entities,
"processes": processes,
"referredEntities": {},
"summary": {
"totalEntities": len(entities),
"totalProcesses": len(processes),
"entityTypes": list(set(e["typeName"] for e in entities))
}
}