aamanlamba's picture
Phase 2: Enhanced lineage extraction with export to data catalogs
0510038
"""
Alation Exporter - Export to Alation Data Catalog format.
Alation is an enterprise data catalog and data governance platform.
https://www.alation.com/
"""
from typing import Dict, Any, List
from datetime import datetime
import uuid
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge
class AlationExporter(LineageExporter):
"""Export lineage to Alation format."""
def __init__(self, graph: LineageGraph, datasource_id: int = 1,
datasource_name: str = "Lineage Accelerator"):
super().__init__(graph)
self.datasource_id = datasource_id
self.datasource_name = datasource_name
@property
def format_name(self) -> str:
return "Alation"
@property
def file_extension(self) -> str:
return ".json"
def _node_type_to_alation_otype(self, node_type: str) -> str:
"""Map internal node types to Alation object types."""
type_mapping = {
"table": "table",
"view": "view",
"model": "table",
"source": "datasource",
"destination": "table",
"column": "attribute",
"database": "schema",
"schema": "schema",
"report": "bi_report",
"dimension": "table",
"fact": "table",
"feature_set": "table",
"semantic_model": "bi_datasource",
"external_api": "datasource",
"extract": "table"
}
return type_mapping.get(node_type.lower(), "table")
def _create_table_object(self, node: LineageNode) -> Dict[str, Any]:
"""Create an Alation table object from a node."""
obj = {
"key": self._get_key(node),
"title": node.name,
"description": node.description or "",
"ds_id": self.datasource_id,
"schema_name": node.schema or "default",
"table_name": node.name,
"table_type": node.type.upper() if node.type else "TABLE"
}
# Add custom fields
custom_fields = []
if node.category:
custom_fields.append({
"field_name": "Data Layer",
"value": node.category
})
if node.owner:
custom_fields.append({
"field_name": "Data Owner",
"value": node.owner
})
if node.tags:
custom_fields.append({
"field_name": "Tags",
"value": ", ".join(node.tags)
})
if node.database:
custom_fields.append({
"field_name": "Database",
"value": node.database
})
if custom_fields:
obj["custom_fields"] = custom_fields
return obj
def _get_key(self, node: LineageNode) -> str:
"""Get Alation-style key for a node."""
parts = [str(self.datasource_id)]
if node.schema:
parts.append(node.schema)
else:
parts.append("default")
parts.append(node.name)
return ".".join(parts)
def _create_column_objects(self, node: LineageNode) -> List[Dict[str, Any]]:
"""Create Alation column objects from a node's columns."""
if not node.columns:
return []
column_objects = []
table_key = self._get_key(node)
for idx, col in enumerate(node.columns):
col_obj = {
"key": f"{table_key}.{col.get('name')}",
"column_name": col.get("name"),
"column_type": col.get("type") or col.get("data_type", "string"),
"description": col.get("description", ""),
"table_key": table_key,
"position": idx + 1
}
# Check for primary key
if col.get("isPrimaryKey"):
col_obj["is_primary_key"] = True
# Check for foreign key
if col.get("isForeignKey"):
col_obj["is_foreign_key"] = True
if col.get("references"):
col_obj["fk_reference"] = col.get("references")
column_objects.append(col_obj)
return column_objects
def _create_lineage_object(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an Alation lineage object from an edge."""
source_node = self.graph.get_node(edge.source)
target_node = self.graph.get_node(edge.target)
lineage = {
"source_key": self._get_key(source_node) if source_node else edge.source,
"target_key": self._get_key(target_node) if target_node else edge.target,
"lineage_type": edge.type or "DIRECT"
}
# Add job information if available
if edge.job_name:
lineage["dataflow_name"] = edge.job_name
if edge.job_id:
lineage["dataflow_id"] = edge.job_id
# Add transformation description
if edge.transformation:
lineage["transformation_description"] = edge.transformation
return lineage
def _create_dataflow(self, edge: LineageEdge) -> Dict[str, Any]:
"""Create an Alation dataflow object from an edge."""
dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}"
dataflow = {
"external_id": edge.job_id or str(uuid.uuid4()),
"title": dataflow_name,
"description": f"Data transformation: {edge.type}",
"dataflow_type": edge.type.upper() if edge.type else "ETL"
}
return dataflow
def export(self) -> str:
"""Export to Alation JSON format."""
return self.to_json(indent=2)
def _to_dict(self) -> Dict[str, Any]:
"""Convert to Alation bulk import dictionary."""
# Collect tables
tables = []
columns = []
for node in self.graph.nodes:
tables.append(self._create_table_object(node))
columns.extend(self._create_column_objects(node))
# Collect lineage
lineage_objects = [self._create_lineage_object(edge) for edge in self.graph.edges]
# Collect unique dataflows
dataflows = []
seen_dataflows = set()
for edge in self.graph.edges:
dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}"
if dataflow_name not in seen_dataflows:
dataflows.append(self._create_dataflow(edge))
seen_dataflows.add(dataflow_name)
return {
"exportInfo": {
"producer": "Lineage Graph Accelerator",
"exportedAt": self.graph.generated_at,
"sourceLineageName": self.graph.name,
"format": "Alation Bulk API",
"version": "1.0"
},
"datasource": {
"id": self.datasource_id,
"title": self.datasource_name,
"ds_type": "custom"
},
"schemas": self._extract_schemas(),
"tables": tables,
"columns": columns,
"lineage": lineage_objects,
"dataflows": dataflows,
"summary": {
"totalTables": len(tables),
"totalColumns": len(columns),
"totalLineageEdges": len(lineage_objects),
"totalDataflows": len(dataflows),
"schemas": list(set(t.get("schema_name", "default") for t in tables))
}
}
def _extract_schemas(self) -> List[Dict[str, Any]]:
"""Extract unique schemas from nodes."""
schemas = {}
for node in self.graph.nodes:
schema_name = node.schema or "default"
if schema_name not in schemas:
schemas[schema_name] = {
"key": f"{self.datasource_id}.{schema_name}",
"schema_name": schema_name,
"ds_id": self.datasource_id,
"description": f"Schema: {schema_name}"
}
if node.database:
schemas[schema_name]["db_name"] = node.database
return list(schemas.values())