|
|
""" |
|
|
Alation Exporter - Export to Alation Data Catalog format. |
|
|
|
|
|
Alation is an enterprise data catalog and data governance platform. |
|
|
https://www.alation.com/ |
|
|
""" |
|
|
|
|
|
from typing import Dict, Any, List |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
|
|
|
|
class AlationExporter(LineageExporter): |
|
|
"""Export lineage to Alation format.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph, datasource_id: int = 1, |
|
|
datasource_name: str = "Lineage Accelerator"): |
|
|
super().__init__(graph) |
|
|
self.datasource_id = datasource_id |
|
|
self.datasource_name = datasource_name |
|
|
|
|
|
@property |
|
|
def format_name(self) -> str: |
|
|
return "Alation" |
|
|
|
|
|
@property |
|
|
def file_extension(self) -> str: |
|
|
return ".json" |
|
|
|
|
|
def _node_type_to_alation_otype(self, node_type: str) -> str: |
|
|
"""Map internal node types to Alation object types.""" |
|
|
type_mapping = { |
|
|
"table": "table", |
|
|
"view": "view", |
|
|
"model": "table", |
|
|
"source": "datasource", |
|
|
"destination": "table", |
|
|
"column": "attribute", |
|
|
"database": "schema", |
|
|
"schema": "schema", |
|
|
"report": "bi_report", |
|
|
"dimension": "table", |
|
|
"fact": "table", |
|
|
"feature_set": "table", |
|
|
"semantic_model": "bi_datasource", |
|
|
"external_api": "datasource", |
|
|
"extract": "table" |
|
|
} |
|
|
return type_mapping.get(node_type.lower(), "table") |
|
|
|
|
|
def _create_table_object(self, node: LineageNode) -> Dict[str, Any]: |
|
|
"""Create an Alation table object from a node.""" |
|
|
obj = { |
|
|
"key": self._get_key(node), |
|
|
"title": node.name, |
|
|
"description": node.description or "", |
|
|
"ds_id": self.datasource_id, |
|
|
"schema_name": node.schema or "default", |
|
|
"table_name": node.name, |
|
|
"table_type": node.type.upper() if node.type else "TABLE" |
|
|
} |
|
|
|
|
|
|
|
|
custom_fields = [] |
|
|
|
|
|
if node.category: |
|
|
custom_fields.append({ |
|
|
"field_name": "Data Layer", |
|
|
"value": node.category |
|
|
}) |
|
|
|
|
|
if node.owner: |
|
|
custom_fields.append({ |
|
|
"field_name": "Data Owner", |
|
|
"value": node.owner |
|
|
}) |
|
|
|
|
|
if node.tags: |
|
|
custom_fields.append({ |
|
|
"field_name": "Tags", |
|
|
"value": ", ".join(node.tags) |
|
|
}) |
|
|
|
|
|
if node.database: |
|
|
custom_fields.append({ |
|
|
"field_name": "Database", |
|
|
"value": node.database |
|
|
}) |
|
|
|
|
|
if custom_fields: |
|
|
obj["custom_fields"] = custom_fields |
|
|
|
|
|
return obj |
|
|
|
|
|
def _get_key(self, node: LineageNode) -> str: |
|
|
"""Get Alation-style key for a node.""" |
|
|
parts = [str(self.datasource_id)] |
|
|
if node.schema: |
|
|
parts.append(node.schema) |
|
|
else: |
|
|
parts.append("default") |
|
|
parts.append(node.name) |
|
|
return ".".join(parts) |
|
|
|
|
|
def _create_column_objects(self, node: LineageNode) -> List[Dict[str, Any]]: |
|
|
"""Create Alation column objects from a node's columns.""" |
|
|
if not node.columns: |
|
|
return [] |
|
|
|
|
|
column_objects = [] |
|
|
table_key = self._get_key(node) |
|
|
|
|
|
for idx, col in enumerate(node.columns): |
|
|
col_obj = { |
|
|
"key": f"{table_key}.{col.get('name')}", |
|
|
"column_name": col.get("name"), |
|
|
"column_type": col.get("type") or col.get("data_type", "string"), |
|
|
"description": col.get("description", ""), |
|
|
"table_key": table_key, |
|
|
"position": idx + 1 |
|
|
} |
|
|
|
|
|
|
|
|
if col.get("isPrimaryKey"): |
|
|
col_obj["is_primary_key"] = True |
|
|
|
|
|
|
|
|
if col.get("isForeignKey"): |
|
|
col_obj["is_foreign_key"] = True |
|
|
if col.get("references"): |
|
|
col_obj["fk_reference"] = col.get("references") |
|
|
|
|
|
column_objects.append(col_obj) |
|
|
|
|
|
return column_objects |
|
|
|
|
|
def _create_lineage_object(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create an Alation lineage object from an edge.""" |
|
|
source_node = self.graph.get_node(edge.source) |
|
|
target_node = self.graph.get_node(edge.target) |
|
|
|
|
|
lineage = { |
|
|
"source_key": self._get_key(source_node) if source_node else edge.source, |
|
|
"target_key": self._get_key(target_node) if target_node else edge.target, |
|
|
"lineage_type": edge.type or "DIRECT" |
|
|
} |
|
|
|
|
|
|
|
|
if edge.job_name: |
|
|
lineage["dataflow_name"] = edge.job_name |
|
|
if edge.job_id: |
|
|
lineage["dataflow_id"] = edge.job_id |
|
|
|
|
|
|
|
|
if edge.transformation: |
|
|
lineage["transformation_description"] = edge.transformation |
|
|
|
|
|
return lineage |
|
|
|
|
|
def _create_dataflow(self, edge: LineageEdge) -> Dict[str, Any]: |
|
|
"""Create an Alation dataflow object from an edge.""" |
|
|
dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}" |
|
|
|
|
|
dataflow = { |
|
|
"external_id": edge.job_id or str(uuid.uuid4()), |
|
|
"title": dataflow_name, |
|
|
"description": f"Data transformation: {edge.type}", |
|
|
"dataflow_type": edge.type.upper() if edge.type else "ETL" |
|
|
} |
|
|
|
|
|
return dataflow |
|
|
|
|
|
def export(self) -> str: |
|
|
"""Export to Alation JSON format.""" |
|
|
return self.to_json(indent=2) |
|
|
|
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to Alation bulk import dictionary.""" |
|
|
|
|
|
tables = [] |
|
|
columns = [] |
|
|
|
|
|
for node in self.graph.nodes: |
|
|
tables.append(self._create_table_object(node)) |
|
|
columns.extend(self._create_column_objects(node)) |
|
|
|
|
|
|
|
|
lineage_objects = [self._create_lineage_object(edge) for edge in self.graph.edges] |
|
|
|
|
|
|
|
|
dataflows = [] |
|
|
seen_dataflows = set() |
|
|
for edge in self.graph.edges: |
|
|
dataflow_name = edge.job_name or f"dataflow_{edge.source}_to_{edge.target}" |
|
|
if dataflow_name not in seen_dataflows: |
|
|
dataflows.append(self._create_dataflow(edge)) |
|
|
seen_dataflows.add(dataflow_name) |
|
|
|
|
|
return { |
|
|
"exportInfo": { |
|
|
"producer": "Lineage Graph Accelerator", |
|
|
"exportedAt": self.graph.generated_at, |
|
|
"sourceLineageName": self.graph.name, |
|
|
"format": "Alation Bulk API", |
|
|
"version": "1.0" |
|
|
}, |
|
|
"datasource": { |
|
|
"id": self.datasource_id, |
|
|
"title": self.datasource_name, |
|
|
"ds_type": "custom" |
|
|
}, |
|
|
"schemas": self._extract_schemas(), |
|
|
"tables": tables, |
|
|
"columns": columns, |
|
|
"lineage": lineage_objects, |
|
|
"dataflows": dataflows, |
|
|
"summary": { |
|
|
"totalTables": len(tables), |
|
|
"totalColumns": len(columns), |
|
|
"totalLineageEdges": len(lineage_objects), |
|
|
"totalDataflows": len(dataflows), |
|
|
"schemas": list(set(t.get("schema_name", "default") for t in tables)) |
|
|
} |
|
|
} |
|
|
|
|
|
def _extract_schemas(self) -> List[Dict[str, Any]]: |
|
|
"""Extract unique schemas from nodes.""" |
|
|
schemas = {} |
|
|
for node in self.graph.nodes: |
|
|
schema_name = node.schema or "default" |
|
|
if schema_name not in schemas: |
|
|
schemas[schema_name] = { |
|
|
"key": f"{self.datasource_id}.{schema_name}", |
|
|
"schema_name": schema_name, |
|
|
"ds_id": self.datasource_id, |
|
|
"description": f"Schema: {schema_name}" |
|
|
} |
|
|
if node.database: |
|
|
schemas[schema_name]["db_name"] = node.database |
|
|
|
|
|
return list(schemas.values()) |
|
|
|