aamanlamba's picture
Add Apache Atlas export, graph interactivity, and polish
b304992
"""
Apache Atlas Exporter - Export to Apache Atlas format.
Apache Atlas is an open-source metadata management and data governance framework.
https://atlas.apache.org/
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class AtlasExporter(LineageExporter):
"""Export lineage to Apache Atlas format."""
def __init__(self, graph: LineageGraph, cluster_name: str = "lineage_accelerator"):
super().__init__(graph)
self.cluster_name = cluster_name
@property
def format_name(self) -> str:
return "Apache Atlas"
@property
def file_extension(self) -> str:
return ".json"
def _node_type_to_atlas_type(self, node_type: str) -> str:
"""Map internal node types to Atlas type names."""
type_mapping = {
"table": "hive_table",
"view": "hive_table",
"model": "hive_table",
"source": "hive_db",
"destination": "hive_table",
"column": "hive_column",
"database": "hive_db",
"schema": "hive_db",
"report": "Report",
"dimension": "hive_table",
"fact": "hive_table",
"feature_set": "hive_table",
"semantic_model": "DataSet",
"external_api": "api_endpoint",
"extract": "hive_table",
"task": "Process"
}
return type_mapping.get(node_type.lower(), "DataSet")
def _generate_guid(self, identifier: str) -> str:
"""Generate a deterministic GUID for an entity."""
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{self.cluster_name}.{identifier}"))
def _create_qualified_name(self, node: LineageNode) -> str:
"""Create Atlas-style qualified name for a node."""
parts = []
if node.database:
parts.append(node.database)
if node.schema:
parts.append(node.schema)
parts.append(node.name)
parts.append(self.cluster_name)
return ".".join(parts) + f"@{self.cluster_name}"
def _create_entity(self, node: LineageNode) -> Dict[str, Any]:
"""Create an Atlas entity from a node."""
qualified_name = self._create_qualified_name(node)
atlas_type = self._node_type_to_atlas_type(node.type)
entity = {
"typeName": atlas_type,
"guid": self._generate_guid(node.id),
"attributes": {
"qualifiedName": qualified_name,
"name": node.name,
"description": node.description or f"{node.type}: {node.name}",
"owner": node.owner or "lineage_accelerator",
"createTime": int(datetime.now().timestamp() * 1000),
"modifiedTime": int(datetime.now().timestamp() * 1000)
},
"status": "ACTIVE"
}
# Add type-specific attributes
if atlas_type == "hive_table":
entity["attributes"]["tableType"] = "MANAGED_TABLE"
if node.database:
entity["attributes"]["db"] = {
"typeName": "hive_db",
"uniqueAttributes": {
"qualifiedName": f"{node.database}@{self.cluster_name}"
}
}
# Add columns if present
if node.columns:
entity["attributes"]["columns"] = [
{
"typeName": "hive_column",
"uniqueAttributes": {
"qualifiedName": f"{qualified_name}.{col.get('name')}@{self.cluster_name}"
}
}
for col in node.columns
]
# Add classification tags
if node.tags:
entity["classifications"] = [
{"typeName": tag} for tag in node.tags
]
# Add custom attributes
if node.metadata:
entity["attributes"]["additionalProperties"] = node.metadata
if node.category:
entity["attributes"]["dataLayer"] = node.category
return entity
def _create_column_entities(self, node: LineageNode) -> List[Dict[str, Any]]:
"""Create Atlas column entities from a node's columns."""
if not node.columns:
return []
table_qualified_name = self._create_qualified_name(node)
columns = []
for idx, col in enumerate(node.columns):
col_name = col.get("name", f"column_{idx}")
col_qualified_name = f"{table_qualified_name}.{col_name}"
column_entity = {
"typeName": "hive_column",
"guid": self._generate_guid(f"{node.id}.{col_name}"),
"attributes": {
"qualifiedName": col_qualified_name,
"name": col_name,
"type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description", ""),
"position": idx,
"table": {
"typeName": self._node_type_to_atlas_type(node.type),
"uniqueAttributes": {
"qualifiedName": table_qualified_name
}
}
},
"status": "ACTIVE"
}
columns.append(column_entity)
return columns
def _create_process_entity(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an Atlas process entity from an edge (for lineage)."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
process_name = edge.job_name or f"process_{edge.source}_to_{edge.target}"
process_qualified_name = f"{process_name}@{self.cluster_name}"
# Build inputs and outputs
inputs = []
outputs = []
if source_node:
inputs.append({
"typeName": self._node_type_to_atlas_type(source_node.type),
"uniqueAttributes": {
"qualifiedName": self._create_qualified_name(source_node)
}
})
if target_node:
outputs.append({
"typeName": self._node_type_to_atlas_type(target_node.type),
"uniqueAttributes": {
"qualifiedName": self._create_qualified_name(target_node)
}
})
process = {
"typeName": "Process",
"guid": self._generate_guid(f"process.{edge.source}.{edge.target}"),
"attributes": {
"qualifiedName": process_qualified_name,
"name": process_name,
"description": edge.transformation or f"Data flow: {edge.type}",
"inputs": inputs,
"outputs": outputs,
"operationType": edge.type.upper() if edge.type else "ETL"
},
"status": "ACTIVE"
}
if edge.job_id:
process["attributes"]["jobId"] = edge.job_id
return process
def export(self) -> str:
"""Export to Apache Atlas JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to Atlas bulk import dictionary."""
entities = []
referred_entities = {}
# Create database entities
databases = set()
for node in self.graph.nodes:
if node.database and node.database not in databases:
databases.add(node.database)
db_entity = {
"typeName": "hive_db",
"guid": self._generate_guid(f"db.{node.database}"),
"attributes": {
"qualifiedName": f"{node.database}@{self.cluster_name}",
"name": node.database,
"description": f"Database: {node.database}",
"clusterName": self.cluster_name
},
"status": "ACTIVE"
}
entities.append(db_entity)
# Create table/dataset entities
for node in self.graph.nodes:
entity = self._create_entity(node)
entities.append(entity)
# Create column entities
for col_entity in self._create_column_entities(node):
entities.append(col_entity)
# Create process entities for lineage
for edge in self.graph.edges:
process = self._create_process_entity(edge)
entities.append(process)
return {
"exportInfo": {
"producer": "Lineage Graph Accelerator",
"exportedAt": self.graph.generated_at,
"sourceLineageName": self.graph.name,
"format": "Apache Atlas",
"version": "2.0"
},
"atlasVersion": "2.3.0",
"cluster": self.cluster_name,
"entities": entities,
"referredEntities": referred_entities,
"summary": {
"totalEntities": len(entities),
"databases": list(databases),
"entityTypes": list(set(e["typeName"] for e in entities)),
"processCount": len(self.graph.edges)
}
}