aamanlamba's picture
Phase 2: Enhanced lineage extraction with export to data catalogs
0510038
"""
Collibra Exporter - Export to Collibra Data Intelligence format.
Collibra is an enterprise data governance and catalog platform.
https://www.collibra.com/
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class CollibraExporter(LineageExporter):
"""Export lineage to Collibra import format."""
def __init__(self, graph: LineageGraph, community_name: str = "Data Lineage",
domain_name: str = "Physical Data Dictionary"):
super().__init__(graph)
self.community_name = community_name
self.domain_name = domain_name
@property
def format_name(self) -> str:
return "Collibra"
@property
def file_extension(self) -> str:
return ".json"
def _node_type_to_collibra_type(self, node_type: str) -> str:
"""Map internal node types to Collibra asset types."""
type_mapping = {
"table": "Table",
"view": "View",
"model": "Data Set",
"source": "Data Source",
"destination": "Data Target",
"column": "Column",
"database": "Database",
"schema": "Schema",
"report": "Report",
"dimension": "Dimension Table",
"fact": "Fact Table",
"feature_set": "Data Set",
"semantic_model": "Business Intelligence Report",
"external_api": "Data Source",
"extract": "Data Set"
}
return type_mapping.get(node_type.lower(), "Data Set")
def _edge_type_to_collibra_relation(self, edge_type: str) -> str:
"""Map internal edge types to Collibra relation types."""
relation_mapping = {
"transform": "is source of",
"reference": "references",
"ingest": "is source of",
"export": "is target of",
"join": "is source of",
"aggregate": "is source of",
"model": "is source of",
"publish": "is target of",
"reverse_etl": "is target of"
}
return relation_mapping.get(edge_type.lower(), "is source of")
def _create_asset(self, node: LineageNode) -> Dict[str, Any]:
"""Create a Collibra asset from a node."""
asset = {
"resourceType": "Asset",
"identifier": {
"name": node.name,
"domain": {
"name": self.domain_name,
"community": {
"name": self.community_name
}
}
},
"type": {
"name": self._node_type_to_collibra_type(node.type)
},
"displayName": node.name,
"attributes": {}
}
# Add description
if node.description:
asset["attributes"]["Description"] = [{"value": node.description}]
# Add database and schema
if node.database:
asset["attributes"]["Technical Data Type"] = [{"value": node.database}]
if node.schema:
asset["attributes"]["Schema Name"] = [{"value": node.schema}]
# Add owner
if node.owner:
asset["attributes"]["Data Owner"] = [{"value": node.owner}]
# Add tags as business terms
if node.tags:
asset["attributes"]["Tags"] = [{"value": ", ".join(node.tags)}]
# Add category
if node.category:
asset["attributes"]["Category"] = [{"value": node.category}]
return asset
def _create_relation(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create a Collibra relation from an edge."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
relation = {
"resourceType": "Relation",
"source": {
"name": source_node.name if source_node else edge.source,
"domain": {
"name": self.domain_name,
"community": {
"name": self.community_name
}
}
},
"target": {
"name": target_node.name if target_node else edge.target,
"domain": {
"name": self.domain_name,
"community": {
"name": self.community_name
}
}
},
"type": {
"role": self._edge_type_to_collibra_relation(edge.type),
"coRole": "has source",
"sourceType": {
"name": self._node_type_to_collibra_type(
source_node.type if source_node else "table"
)
},
"targetType": {
"name": self._node_type_to_collibra_type(
target_node.type if target_node else "table"
)
}
}
}
return relation
def _create_column_assets(self, node: LineageNode) -> List[Dict[str, Any]]:
"""Create Collibra column assets from a node's columns."""
if not node.columns:
return []
column_assets = []
for col in node.columns:
column_asset = {
"resourceType": "Asset",
"identifier": {
"name": f"{node.name}.{col.get('name')}",
"domain": {
"name": self.domain_name,
"community": {
"name": self.community_name
}
}
},
"type": {
"name": "Column"
},
"displayName": col.get("name"),
"attributes": {
"Technical Data Type": [{"value": col.get("type") or col.get("data_type", "string")}]
},
"relations": {
"Column is part of Table": [{
"name": node.name,
"domain": {
"name": self.domain_name,
"community": {
"name": self.community_name
}
}
}]
}
}
if col.get("description"):
column_asset["attributes"]["Description"] = [{"value": col.get("description")}]
column_assets.append(column_asset)
return column_assets
def export(self) -> str:
"""Export to Collibra JSON import format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to Collibra import dictionary."""
# Collect all assets (nodes)
assets = []
for node in self.graph.nodes:
assets.append(self._create_asset(node))
# Add column assets if present
assets.extend(self._create_column_assets(node))
# Collect all relations (edges)
relations = [self._create_relation(edge) for edge in self.graph.edges]
return {
"exportInfo": {
"producer": "Lineage Graph Accelerator",
"exportedAt": self.graph.generated_at,
"sourceLineageName": self.graph.name,
"format": "Collibra Import API",
"version": "2.0"
},
"community": {
"name": self.community_name,
"description": f"Data lineage imported from {self.graph.name}"
},
"domain": {
"name": self.domain_name,
"type": "Physical Data Dictionary",
"community": {
"name": self.community_name
}
},
"assets": assets,
"relations": relations,
"summary": {
"totalAssets": len(assets),
"totalRelations": len(relations),
"assetTypes": list(set(
self._node_type_to_collibra_type(n.type) for n in self.graph.nodes
))
}
}