|
|
""" |
|
|
Apache Atlas Exporter - Export to Apache Atlas format. |
|
|
|
|
|
Apache Atlas is an open-source metadata management and data governance framework. |
|
|
https://atlas.apache.org/ |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
|
|
|
|
class AtlasExporter(LineageExporter): |
|
|
"""Export lineage to Apache Atlas format.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph, cluster_name: str = "lineage_accelerator"): |
|
|
super().__init__(graph) |
|
|
self.cluster_name = cluster_name |
|
|
|
|
|
@property |
|
|
def format_name(self) -> str: |
|
|
return "Apache Atlas" |
|
|
|
|
|
@property |
|
|
def file_extension(self) -> str: |
|
|
return ".json" |
|
|
|
|
|
def _node_type_to_atlas_type(self, node_type: str) -> str: |
|
|
"""Map internal node types to Atlas type names.""" |
|
|
type_mapping = { |
|
|
"table": "hive_table", |
|
|
"view": "hive_table", |
|
|
"model": "hive_table", |
|
|
"source": "hive_db", |
|
|
"destination": "hive_table", |
|
|
"column": "hive_column", |
|
|
"database": "hive_db", |
|
|
"schema": "hive_db", |
|
|
"report": "Report", |
|
|
"dimension": "hive_table", |
|
|
"fact": "hive_table", |
|
|
"feature_set": "hive_table", |
|
|
"semantic_model": "DataSet", |
|
|
"external_api": "api_endpoint", |
|
|
"extract": "hive_table", |
|
|
"task": "Process" |
|
|
} |
|
|
return type_mapping.get(node_type.lower(), "DataSet") |
|
|
|
|
|
def _generate_guid(self, identifier: str) -> str: |
|
|
"""Generate a deterministic GUID for an entity.""" |
|
|
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{self.cluster_name}.{identifier}")) |
|
|
|
|
|
def _create_qualified_name(self, node: LineageNode) -> str: |
|
|
"""Create Atlas-style qualified name for a node.""" |
|
|
parts = [] |
|
|
if node.database: |
|
|
parts.append(node.database) |
|
|
if node.schema: |
|
|
parts.append(node.schema) |
|
|
parts.append(node.name) |
|
|
parts.append(self.cluster_name) |
|
|
return ".".join(parts) + f"@{self.cluster_name}" |
|
|
|
|
|
def _create_entity(self, node: LineageNode) -> Dict[str, Any]: |
|
|
"""Create an Atlas entity from a node.""" |
|
|
qualified_name = self._create_qualified_name(node) |
|
|
atlas_type = self._node_type_to_atlas_type(node.type) |
|
|
|
|
|
entity = { |
|
|
"typeName": atlas_type, |
|
|
"guid": self._generate_guid(node.id), |
|
|
"attributes": { |
|
|
"qualifiedName": qualified_name, |
|
|
"name": node.name, |
|
|
"description": node.description or f"{node.type}: {node.name}", |
|
|
"owner": node.owner or "lineage_accelerator", |
|
|
"createTime": int(datetime.now().timestamp() * 1000), |
|
|
"modifiedTime": int(datetime.now().timestamp() * 1000) |
|
|
}, |
|
|
"status": "ACTIVE" |
|
|
} |
|
|
|
|
|
|
|
|
if atlas_type == "hive_table": |
|
|
entity["attributes"]["tableType"] = "MANAGED_TABLE" |
|
|
if node.database: |
|
|
entity["attributes"]["db"] = { |
|
|
"typeName": "hive_db", |
|
|
"uniqueAttributes": { |
|
|
"qualifiedName": f"{node.database}@{self.cluster_name}" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if node.columns: |
|
|
entity["attributes"]["columns"] = [ |
|
|
{ |
|
|
"typeName": "hive_column", |
|
|
"uniqueAttributes": { |
|
|
"qualifiedName": f"{qualified_name}.{col.get('name')}@{self.cluster_name}" |
|
|
} |
|
|
} |
|
|
for col in node.columns |
|
|
] |
|
|
|
|
|
|
|
|
if node.tags: |
|
|
entity["classifications"] = [ |
|
|
{"typeName": tag} for tag in node.tags |
|
|
] |
|
|
|
|
|
|
|
|
if node.metadata: |
|
|
entity["attributes"]["additionalProperties"] = node.metadata |
|
|
|
|
|
if node.category: |
|
|
entity["attributes"]["dataLayer"] = node.category |
|
|
|
|
|
return entity |
|
|
|
|
|
def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]: |
|
|
"""Create Atlas column entities from a node's columns.""" |
|
|
if not node.columns: |
|
|
return [] |
|
|
|
|
|
table_qualified_name = self._create_qualified_name(node) |
|
|
columns = [] |
|
|
|
|
|
for idx, col in enumerate(node.columns): |
|
|
col_name = col.get("name", f"column_{idx}") |
|
|
col_qualified_name = f"{table_qualified_name}.{col_name}" |
|
|
|
|
|
column_entity = { |
|
|
"typeName": "hive_column", |
|
|
"guid": self._generate_guid(f"{node.id}.{col_name}"), |
|
|
"attributes": { |
|
|
"qualifiedName": col_qualified_name, |
|
|
"name": col_name, |
|
|
"type": col.get("type") or col.get("data_type", "string"), |
|
|
"description": col.get("description", ""), |
|
|
"position": idx, |
|
|
"table": { |
|
|
"typeName": self._node_type_to_atlas_type(node.type), |
|
|
"uniqueAttributes": { |
|
|
"qualifiedName": table_qualified_name |
|
|
} |
|
|
} |
|
|
}, |
|
|
"status": "ACTIVE" |
|
|
} |
|
|
|
|
|
columns.append(column_entity) |
|
|
|
|
|
return columns |
|
|
|
|
|
def _create_process_entity(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create an Atlas process entity from an edge (for lineage).""" |
|
|
source_node = self.graph.get_node(edge.source) |
|
|
target_node = self.graph.get_node(edge.target) |
|
|
|
|
|
process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}" |
|
|
process_qualified_name = f"{process_name}@{self.cluster_name}" |
|
|
|
|
|
|
|
|
inputs = [] |
|
|
outputs = [] |
|
|
|
|
|
if source_node: |
|
|
inputs.append({ |
|
|
"typeName": self._node_type_to_atlas_type(source_node.type), |
|
|
"uniqueAttributes": { |
|
|
"qualifiedName": self._create_qualified_name(source_node) |
|
|
} |
|
|
}) |
|
|
|
|
|
if target_node: |
|
|
outputs.append({ |
|
|
"typeName": self._node_type_to_atlas_type(target_node.type), |
|
|
"uniqueAttributes": { |
|
|
"qualifiedName": self._create_qualified_name(target_node) |
|
|
} |
|
|
}) |
|
|
|
|
|
process = { |
|
|
"typeName": "Process", |
|
|
"guid": self._generate_guid(f"process.{edge.source}.{edge.target}"), |
|
|
"attributes": { |
|
|
"qualifiedName": process_qualified_name, |
|
|
"name": process_name, |
|
|
"description": edge.transformation or f"Data flow: {edge.type}", |
|
|
"inputs": inputs, |
|
|
"outputs": outputs, |
|
|
"operationType": edge.type.upper() if edge.type else "ETL" |
|
|
}, |
|
|
"status": "ACTIVE" |
|
|
} |
|
|
|
|
|
if edge.job_id: |
|
|
process["attributes"]["jobId"] = edge.job_id |
|
|
|
|
|
return process |
|
|
|
|
|
def export(self) -> str: |
|
|
"""Export to Apache Atlas JSON format.""" |
|
|
return self.to_json(indent=2) |
|
|
|
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to Atlas bulk import dictionary.""" |
|
|
entities = [] |
|
|
referred_entities = {} |
|
|
|
|
|
|
|
|
databases = set() |
|
|
for node in self.graph.nodes: |
|
|
if node.database and node.database not in databases: |
|
|
databases.add(node.database) |
|
|
db_entity = { |
|
|
"typeName": "hive_db", |
|
|
"guid": self._generate_guid(f"db.{node.database}"), |
|
|
"attributes": { |
|
|
"qualifiedName": f"{node.database}@{self.cluster_name}", |
|
|
"name": node.database, |
|
|
"description": f"Database: {node.database}", |
|
|
"clusterName": self.cluster_name |
|
|
}, |
|
|
"status": "ACTIVE" |
|
|
} |
|
|
entities.append(db_entity) |
|
|
|
|
|
|
|
|
for node in self.graph.nodes: |
|
|
entity = self._create_entity(node) |
|
|
entities.append(entity) |
|
|
|
|
|
|
|
|
for col_entity in self._create_column_entities(node): |
|
|
entities.append(col_entity) |
|
|
|
|
|
|
|
|
for edge in self.graph.edges: |
|
|
process = self._create_process_entity(edge) |
|
|
entities.append(process) |
|
|
|
|
|
return { |
|
|
"exportInfo": { |
|
|
"producer": "Lineage Graph Accelerator", |
|
|
"exportedAt": self.graph.generated_at, |
|
|
"sourceLineageName": self.graph.name, |
|
|
"format": "Apache Atlas", |
|
|
"version": "2.0" |
|
|
}, |
|
|
"atlasVersion": "2.3.0", |
|
|
"cluster": self.cluster_name, |
|
|
"entities": entities, |
|
|
"referredEntities": referred_entities, |
|
|
"summary": { |
|
|
"totalEntities": len(entities), |
|
|
"databases": list(databases), |
|
|
"entityTypes": list(set(e["typeName"] for e in entities)), |
|
|
"processCount": len(self.graph.edges) |
|
|
} |
|
|
} |
|
|
|