|
|
""" |
|
|
Collibra Exporter - Export to Collibra Data Intelligence format. |
|
|
|
|
|
Collibra is an enterprise data governance and catalog platform. |
|
|
https://www.collibra.com/ |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
|
|
|
|
class CollibraExporter(LineageExporter): |
|
|
"""Export lineage to Collibra import format.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph, community_name: str = "Data Lineage", |
|
|
domain_name: str = "Physical Data Dictionary"): |
|
|
super().__init__(graph) |
|
|
self.community_name = community_name |
|
|
self.domain_name = domain_name |
|
|
|
|
|
@property |
|
|
def format_name(self) -> str: |
|
|
return "Collibra" |
|
|
|
|
|
@property |
|
|
def file_extension(self) -> str: |
|
|
return ".json" |
|
|
|
|
|
def _node_type_to_collibra_type(self, node_type: str) -> str: |
|
|
"""Map internal node types to Collibra asset types.""" |
|
|
type_mapping = { |
|
|
"table": "Table", |
|
|
"view": "View", |
|
|
"model": "Data Set", |
|
|
"source": "Data Source", |
|
|
"destination": "Data Target", |
|
|
"column": "Column", |
|
|
"database": "Database", |
|
|
"schema": "Schema", |
|
|
"report": "Report", |
|
|
"dimension": "Dimension Table", |
|
|
"fact": "Fact Table", |
|
|
"feature_set": "Data Set", |
|
|
"semantic_model": "Business Intelligence Report", |
|
|
"external_api": "Data Source", |
|
|
"extract": "Data Set" |
|
|
} |
|
|
return type_mapping.get(node_type.lower(), "Data Set") |
|
|
|
|
|
def _edge_type_to_collibra_relation(self, edge_type: str) -> str: |
|
|
"""Map internal edge types to Collibra relation types.""" |
|
|
relation_mapping = { |
|
|
"transform": "is source of", |
|
|
"reference": "references", |
|
|
"ingest": "is source of", |
|
|
"export": "is target of", |
|
|
"join": "is source of", |
|
|
"aggregate": "is source of", |
|
|
"model": "is source of", |
|
|
"publish": "is target of", |
|
|
"reverse_etl": "is target of" |
|
|
} |
|
|
return relation_mapping.get(edge_type.lower(), "is source of") |
|
|
|
|
|
def _create_asset(self, node: LineageNode) -> Dict[str, Any]: |
|
|
"""Create a Collibra asset from a node.""" |
|
|
asset = { |
|
|
"resourceType": "Asset", |
|
|
"identifier": { |
|
|
"name": node.name, |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
} |
|
|
}, |
|
|
"type": { |
|
|
"name": self._node_type_to_collibra_type(node.type) |
|
|
}, |
|
|
"displayName": node.name, |
|
|
"attributes": {} |
|
|
} |
|
|
|
|
|
|
|
|
if node.description: |
|
|
asset["attributes"]["Description"] = [{"value": node.description}] |
|
|
|
|
|
|
|
|
if node.database: |
|
|
asset["attributes"]["Technical Data Type"] = [{"value": node.database}] |
|
|
if node.schema: |
|
|
asset["attributes"]["Schema Name"] = [{"value": node.schema}] |
|
|
|
|
|
|
|
|
if node.owner: |
|
|
asset["attributes"]["Data Owner"] = [{"value": node.owner}] |
|
|
|
|
|
|
|
|
if node.tags: |
|
|
asset["attributes"]["Tags"] = [{"value": ", ".join(node.tags)}] |
|
|
|
|
|
|
|
|
if node.category: |
|
|
asset["attributes"]["Category"] = [{"value": node.category}] |
|
|
|
|
|
return asset |
|
|
|
|
|
def _create_relation(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create a Collibra relation from an edge.""" |
|
|
source_node = self.graph.get_node(edge.source) |
|
|
target_node = self.graph.get_node(edge.target) |
|
|
|
|
|
relation = { |
|
|
"resourceType": "Relation", |
|
|
"source": { |
|
|
"name": source_node.name if source_node else edge.source, |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
} |
|
|
}, |
|
|
"target": { |
|
|
"name": target_node.name if target_node else edge.target, |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
} |
|
|
}, |
|
|
"type": { |
|
|
"role": self._edge_type_to_collibra_relation(edge.type), |
|
|
"coRole": "has source", |
|
|
"sourceType": { |
|
|
"name": self._node_type_to_collibra_type( |
|
|
source_node.type if source_node else "table" |
|
|
) |
|
|
}, |
|
|
"targetType": { |
|
|
"name": self._node_type_to_collibra_type( |
|
|
target_node.type if target_node else "table" |
|
|
) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return relation |
|
|
|
|
|
def _create_column_assets(self, node: LineageNode) -> List[Dict[str, Any]]: |
|
|
"""Create Collibra column assets from a node's columns.""" |
|
|
if not node.columns: |
|
|
return [] |
|
|
|
|
|
column_assets = [] |
|
|
for col in node.columns: |
|
|
column_asset = { |
|
|
"resourceType": "Asset", |
|
|
"identifier": { |
|
|
"name": f"{node.name}.{col.get('name')}", |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
} |
|
|
}, |
|
|
"type": { |
|
|
"name": "Column" |
|
|
}, |
|
|
"displayName": col.get("name"), |
|
|
"attributes": { |
|
|
"Technical Data Type": [{"value": col.get("type") or col.get("data_type", "string")}] |
|
|
}, |
|
|
"relations": { |
|
|
"Column is part of Table": [{ |
|
|
"name": node.name, |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
} |
|
|
}] |
|
|
} |
|
|
} |
|
|
|
|
|
if col.get("description"): |
|
|
column_asset["attributes"]["Description"] = [{"value": col.get("description")}] |
|
|
|
|
|
column_assets.append(column_asset) |
|
|
|
|
|
return column_assets |
|
|
|
|
|
def export(self) -> str: |
|
|
"""Export to Collibra JSON import format.""" |
|
|
return self.to_json(indent=2) |
|
|
|
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to Collibra import dictionary.""" |
|
|
|
|
|
assets = [] |
|
|
for node in self.graph.nodes: |
|
|
assets.append(self._create_asset(node)) |
|
|
|
|
|
assets.extend(self._create_column_assets(node)) |
|
|
|
|
|
|
|
|
relations = [self._create_relation(edge) for edge in self.graph.edges] |
|
|
|
|
|
return { |
|
|
"exportInfo": { |
|
|
"producer": "Lineage Graph Accelerator", |
|
|
"exportedAt": self.graph.generated_at, |
|
|
"sourceLineageName": self.graph.name, |
|
|
"format": "Collibra Import API", |
|
|
"version": "2.0" |
|
|
}, |
|
|
"community": { |
|
|
"name": self.community_name, |
|
|
"description": f"Data lineage imported from {self.graph.name}" |
|
|
}, |
|
|
"domain": { |
|
|
"name": self.domain_name, |
|
|
"type": "Physical Data Dictionary", |
|
|
"community": { |
|
|
"name": self.community_name |
|
|
} |
|
|
}, |
|
|
"assets": assets, |
|
|
"relations": relations, |
|
|
"summary": { |
|
|
"totalAssets": len(assets), |
|
|
"totalRelations": len(relations), |
|
|
"assetTypes": list(set( |
|
|
self._node_type_to_collibra_type(n.type) for n in self.graph.nodes |
|
|
)) |
|
|
} |
|
|
} |
|
|
|