""" Collibra Exporter - Export to Collibra Data Intelligence format. Collibra is an enterprise data governance and catalog platform. https://www.collibra.com/ """ from typing import Dict, Any, List from datetime import datetime import uuid from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge class CollibraExporter(LineageExporter): """Export lineage to Collibra import format.""" def __init__(self, graph: LineageGraph, community_name: str = "Data Lineage", domain_name: str = "Physical Data Dictionary"): super().__init__(graph) self.community_name = community_name self.domain_name = domain_name @property def format_name(self) -> str: return "Collibra" @property def file_extension(self) -> str: return ".json" def _node_type_to_collibra_type(self, node_type: str) -> str: """Map internal node types to Collibra asset types.""" type_mapping = { "table": "Table", "view": "View", "model": "Data Set", "source": "Data Source", "destination": "Data Target", "column": "Column", "database": "Database", "schema": "Schema", "report": "Report", "dimension": "Dimension Table", "fact": "Fact Table", "feature_set": "Data Set", "semantic_model": "Business Intelligence Report", "external_api": "Data Source", "extract": "Data Set" } return type_mapping.get(node_type.lower(), "Data Set") def _edge_type_to_collibra_relation(self, edge_type: str) -> str: """Map internal edge types to Collibra relation types.""" relation_mapping = { "transform": "is source of", "reference": "references", "ingest": "is source of", "export": "is target of", "join": "is source of", "aggregate": "is source of", "model": "is source of", "publish": "is target of", "reverse_etl": "is target of" } return relation_mapping.get(edge_type.lower(), "is source of") def _create_asset(self, node: LineageNode) -> Dict[str, Any]: """Create a Collibra asset from a node.""" asset = { "resourceType": "Asset", "identifier": { "name": node.name, "domain": { "name": self.domain_name, "community": { "name": self.community_name } } }, "type": { "name": self._node_type_to_collibra_type(node.type) }, "displayName": node.name, "attributes": {} } # Add description if node.description: asset["attributes"]["Description"] = [{"value": node.description}] # Add database and schema if node.database: asset["attributes"]["Technical Data Type"] = [{"value": node.database}] if node.schema: asset["attributes"]["Schema Name"] = [{"value": node.schema}] # Add owner if node.owner: asset["attributes"]["Data Owner"] = [{"value": node.owner}] # Add tags as business terms if node.tags: asset["attributes"]["Tags"] = [{"value": ", ".join(node.tags)}] # Add category if node.category: asset["attributes"]["Category"] = [{"value": node.category}] return asset def _create_relation(self, edge: LineageEdge) -> Dict[str, Any]: """Create a Collibra relation from an edge.""" source_node = self.graph.get_node(edge.source) target_node = self.graph.get_node(edge.target) relation = { "resourceType": "Relation", "source": { "name": source_node.name if source_node else edge.source, "domain": { "name": self.domain_name, "community": { "name": self.community_name } } }, "target": { "name": target_node.name if target_node else edge.target, "domain": { "name": self.domain_name, "community": { "name": self.community_name } } }, "type": { "role": self._edge_type_to_collibra_relation(edge.type), "coRole": "has source", "sourceType": { "name": self._node_type_to_collibra_type( source_node.type if source_node else "table" ) }, "targetType": { "name": self._node_type_to_collibra_type( target_node.type if target_node else "table" ) } } } return relation def _create_column_assets(self, node: LineageNode) -> List[Dict[str, Any]]: """Create Collibra column assets from a node's columns.""" if not node.columns: return [] column_assets = [] for col in node.columns: column_asset = { "resourceType": "Asset", "identifier": { "name": f"{node.name}.{col.get('name')}", "domain": { "name": self.domain_name, "community": { "name": self.community_name } } }, "type": { "name": "Column" }, "displayName": col.get("name"), "attributes": { "Technical Data Type": [{"value": col.get("type") or col.get("data_type", "string")}] }, "relations": { "Column is part of Table": [{ "name": node.name, "domain": { "name": self.domain_name, "community": { "name": self.community_name } } }] } } if col.get("description"): column_asset["attributes"]["Description"] = [{"value": col.get("description")}] column_assets.append(column_asset) return column_assets def export(self) -> str: """Export to Collibra JSON import format.""" return self.to_json(indent=2) def _to_dict(self) -> Dict[str, Any]: """Convert to Collibra import dictionary.""" # Collect all assets (nodes) assets = [] for node in self.graph.nodes: assets.append(self._create_asset(node)) # Add column assets if present assets.extend(self._create_column_assets(node)) # Collect all relations (edges) relations = [self._create_relation(edge) for edge in self.graph.edges] return { "exportInfo": { "producer": "Lineage Graph Accelerator", "exportedAt": self.graph.generated_at, "sourceLineageName": self.graph.name, "format": "Collibra Import API", "version": "2.0" }, "community": { "name": self.community_name, "description": f"Data lineage imported from {self.graph.name}" }, "domain": { "name": self.domain_name, "type": "Physical Data Dictionary", "community": { "name": self.community_name } }, "assets": assets, "relations": relations, "summary": { "totalAssets": len(assets), "totalRelations": len(relations), "assetTypes": list(set( self._node_type_to_collibra_type(n.type) for n in self.graph.nodes )) } }