|
|
""" |
|
|
Base classes for lineage export functionality. |
|
|
""" |
|
|
|
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Dict, Optional, Any |
|
|
from abc import ABC, abstractmethod |
|
|
import json |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LineageNode: |
|
|
"""Represents a node in the lineage graph.""" |
|
|
id: str |
|
|
name: str |
|
|
type: str |
|
|
category: Optional[str] = None |
|
|
database: Optional[str] = None |
|
|
schema: Optional[str] = None |
|
|
description: Optional[str] = None |
|
|
columns: Optional[List[Dict[str, Any]]] = None |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
tags: Optional[List[str]] = None |
|
|
owner: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert node to dictionary.""" |
|
|
return {k: v for k, v in { |
|
|
'id': self.id, |
|
|
'name': self.name, |
|
|
'type': self.type, |
|
|
'category': self.category, |
|
|
'database': self.database, |
|
|
'schema': self.schema, |
|
|
'description': self.description, |
|
|
'columns': self.columns, |
|
|
'metadata': self.metadata, |
|
|
'tags': self.tags, |
|
|
'owner': self.owner, |
|
|
}.items() if v is not None} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LineageEdge: |
|
|
"""Represents an edge (relationship) in the lineage graph.""" |
|
|
source: str |
|
|
target: str |
|
|
type: str |
|
|
job_id: Optional[str] = None |
|
|
job_name: Optional[str] = None |
|
|
transformation: Optional[str] = None |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert edge to dictionary.""" |
|
|
return {k: v for k, v in { |
|
|
'source': self.source, |
|
|
'target': self.target, |
|
|
'type': self.type, |
|
|
'job_id': self.job_id, |
|
|
'job_name': self.job_name, |
|
|
'transformation': self.transformation, |
|
|
'metadata': self.metadata, |
|
|
}.items() if v is not None} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LineageGraph: |
|
|
"""Represents a complete lineage graph.""" |
|
|
name: str |
|
|
nodes: List[LineageNode] = field(default_factory=list) |
|
|
edges: List[LineageEdge] = field(default_factory=list) |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
generated_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')) |
|
|
|
|
|
def add_node(self, node: LineageNode) -> None: |
|
|
"""Add a node to the graph.""" |
|
|
self.nodes.append(node) |
|
|
|
|
|
def add_edge(self, edge: LineageEdge) -> None: |
|
|
"""Add an edge to the graph.""" |
|
|
self.edges.append(edge) |
|
|
|
|
|
def get_node(self, node_id: str) -> Optional[LineageNode]: |
|
|
"""Get a node by ID.""" |
|
|
for node in self.nodes: |
|
|
if node.id == node_id: |
|
|
return node |
|
|
return None |
|
|
|
|
|
def get_upstream(self, node_id: str) -> List[LineageNode]: |
|
|
"""Get all upstream nodes for a given node.""" |
|
|
upstream_ids = [e.source for e in self.edges if e.target == node_id] |
|
|
return [n for n in self.nodes if n.id in upstream_ids] |
|
|
|
|
|
def get_downstream(self, node_id: str) -> List[LineageNode]: |
|
|
"""Get all downstream nodes for a given node.""" |
|
|
downstream_ids = [e.target for e in self.edges if e.source == node_id] |
|
|
return [n for n in self.nodes if n.id in downstream_ids] |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert graph to dictionary.""" |
|
|
return { |
|
|
'name': self.name, |
|
|
'generated_at': self.generated_at, |
|
|
'nodes': [n.to_dict() for n in self.nodes], |
|
|
'edges': [e.to_dict() for e in self.edges], |
|
|
'metadata': self.metadata, |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'LineageGraph': |
|
|
"""Create a LineageGraph from a dictionary.""" |
|
|
graph = cls( |
|
|
name=data.get('name', 'Untitled'), |
|
|
metadata=data.get('metadata'), |
|
|
generated_at=data.get('generated_at', datetime.utcnow().isoformat() + 'Z') |
|
|
) |
|
|
|
|
|
|
|
|
for node_data in data.get('nodes', []): |
|
|
node = LineageNode( |
|
|
id=node_data.get('id'), |
|
|
name=node_data.get('name'), |
|
|
type=node_data.get('type', 'unknown'), |
|
|
category=node_data.get('category'), |
|
|
database=node_data.get('database'), |
|
|
schema=node_data.get('schema'), |
|
|
description=node_data.get('description'), |
|
|
columns=node_data.get('columns'), |
|
|
metadata=node_data.get('metadata'), |
|
|
tags=node_data.get('tags'), |
|
|
owner=node_data.get('owner'), |
|
|
) |
|
|
graph.add_node(node) |
|
|
|
|
|
|
|
|
for edge_data in data.get('edges', []): |
|
|
edge = LineageEdge( |
|
|
source=edge_data.get('source') or edge_data.get('from'), |
|
|
target=edge_data.get('target') or edge_data.get('to'), |
|
|
type=edge_data.get('type', 'transform'), |
|
|
job_id=edge_data.get('job_id'), |
|
|
job_name=edge_data.get('job_name') or edge_data.get('job'), |
|
|
transformation=edge_data.get('transformation'), |
|
|
metadata=edge_data.get('metadata'), |
|
|
) |
|
|
graph.add_edge(edge) |
|
|
|
|
|
return graph |
|
|
|
|
|
@classmethod |
|
|
def from_json(cls, json_str: str) -> 'LineageGraph': |
|
|
"""Create a LineageGraph from JSON string.""" |
|
|
data = json.loads(json_str) |
|
|
|
|
|
if 'lineage_graph' in data: |
|
|
data = data['lineage_graph'] |
|
|
return cls.from_dict(data) |
|
|
|
|
|
|
|
|
class LineageExporter(ABC): |
|
|
"""Abstract base class for lineage exporters.""" |
|
|
|
|
|
def __init__(self, graph: LineageGraph): |
|
|
self.graph = graph |
|
|
|
|
|
@property |
|
|
@abstractmethod |
|
|
def format_name(self) -> str: |
|
|
"""Return the name of the export format.""" |
|
|
pass |
|
|
|
|
|
@property |
|
|
@abstractmethod |
|
|
def file_extension(self) -> str: |
|
|
"""Return the file extension for the export format.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def export(self) -> str: |
|
|
"""Export the lineage graph to the target format.""" |
|
|
pass |
|
|
|
|
|
def export_to_file(self, filepath: str) -> None: |
|
|
"""Export the lineage graph to a file.""" |
|
|
content = self.export() |
|
|
with open(filepath, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def to_json(self, indent: int = 2) -> str: |
|
|
"""Convert export to JSON string.""" |
|
|
return json.dumps(self._to_dict(), indent=indent) |
|
|
|
|
|
@abstractmethod |
|
|
def _to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert export to dictionary (for JSON serialization).""" |
|
|
pass |
|
|
|