File size: 6,833 Bytes
0510038
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
Base classes for lineage export functionality.
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from abc import ABC, abstractmethod
import json
from datetime import datetime, timezone


@dataclass
class LineageNode:
    """Represents a node in the lineage graph."""
    id: str
    name: str
    type: str  # table, view, model, source, destination, etc.
    category: Optional[str] = None  # raw, staging, marts, reporting, etc.
    database: Optional[str] = None
    schema: Optional[str] = None
    description: Optional[str] = None
    columns: Optional[List[Dict[str, Any]]] = None
    metadata: Optional[Dict[str, Any]] = None
    tags: Optional[List[str]] = None
    owner: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert node to dictionary."""
        return {k: v for k, v in {
            'id': self.id,
            'name': self.name,
            'type': self.type,
            'category': self.category,
            'database': self.database,
            'schema': self.schema,
            'description': self.description,
            'columns': self.columns,
            'metadata': self.metadata,
            'tags': self.tags,
            'owner': self.owner,
        }.items() if v is not None}


@dataclass
class LineageEdge:
    """Represents an edge (relationship) in the lineage graph."""
    source: str  # source node id
    target: str  # target node id
    type: str  # transform, reference, ingest, export, etc.
    job_id: Optional[str] = None
    job_name: Optional[str] = None
    transformation: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert edge to dictionary."""
        return {k: v for k, v in {
            'source': self.source,
            'target': self.target,
            'type': self.type,
            'job_id': self.job_id,
            'job_name': self.job_name,
            'transformation': self.transformation,
            'metadata': self.metadata,
        }.items() if v is not None}


@dataclass
class LineageGraph:
    """Represents a complete lineage graph."""
    name: str
    nodes: List[LineageNode] = field(default_factory=list)
    edges: List[LineageEdge] = field(default_factory=list)
    metadata: Optional[Dict[str, Any]] = None
    generated_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'))

    def add_node(self, node: LineageNode) -> None:
        """Add a node to the graph."""
        self.nodes.append(node)

    def add_edge(self, edge: LineageEdge) -> None:
        """Add an edge to the graph."""
        self.edges.append(edge)

    def get_node(self, node_id: str) -> Optional[LineageNode]:
        """Get a node by ID."""
        for node in self.nodes:
            if node.id == node_id:
                return node
        return None

    def get_upstream(self, node_id: str) -> List[LineageNode]:
        """Get all upstream nodes for a given node."""
        upstream_ids = [e.source for e in self.edges if e.target == node_id]
        return [n for n in self.nodes if n.id in upstream_ids]

    def get_downstream(self, node_id: str) -> List[LineageNode]:
        """Get all downstream nodes for a given node."""
        downstream_ids = [e.target for e in self.edges if e.source == node_id]
        return [n for n in self.nodes if n.id in downstream_ids]

    def to_dict(self) -> Dict[str, Any]:
        """Convert graph to dictionary."""
        return {
            'name': self.name,
            'generated_at': self.generated_at,
            'nodes': [n.to_dict() for n in self.nodes],
            'edges': [e.to_dict() for e in self.edges],
            'metadata': self.metadata,
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'LineageGraph':
        """Create a LineageGraph from a dictionary."""
        graph = cls(
            name=data.get('name', 'Untitled'),
            metadata=data.get('metadata'),
            generated_at=data.get('generated_at', datetime.utcnow().isoformat() + 'Z')
        )

        # Parse nodes
        for node_data in data.get('nodes', []):
            node = LineageNode(
                id=node_data.get('id'),
                name=node_data.get('name'),
                type=node_data.get('type', 'unknown'),
                category=node_data.get('category'),
                database=node_data.get('database'),
                schema=node_data.get('schema'),
                description=node_data.get('description'),
                columns=node_data.get('columns'),
                metadata=node_data.get('metadata'),
                tags=node_data.get('tags'),
                owner=node_data.get('owner'),
            )
            graph.add_node(node)

        # Parse edges
        for edge_data in data.get('edges', []):
            edge = LineageEdge(
                source=edge_data.get('source') or edge_data.get('from'),
                target=edge_data.get('target') or edge_data.get('to'),
                type=edge_data.get('type', 'transform'),
                job_id=edge_data.get('job_id'),
                job_name=edge_data.get('job_name') or edge_data.get('job'),
                transformation=edge_data.get('transformation'),
                metadata=edge_data.get('metadata'),
            )
            graph.add_edge(edge)

        return graph

    @classmethod
    def from_json(cls, json_str: str) -> 'LineageGraph':
        """Create a LineageGraph from JSON string."""
        data = json.loads(json_str)
        # Handle nested structure (lineage_graph key)
        if 'lineage_graph' in data:
            data = data['lineage_graph']
        return cls.from_dict(data)


class LineageExporter(ABC):
    """Abstract base class for lineage exporters."""

    def __init__(self, graph: LineageGraph):
        self.graph = graph

    @property
    @abstractmethod
    def format_name(self) -> str:
        """Return the name of the export format."""
        pass

    @property
    @abstractmethod
    def file_extension(self) -> str:
        """Return the file extension for the export format."""
        pass

    @abstractmethod
    def export(self) -> str:
        """Export the lineage graph to the target format."""
        pass

    def export_to_file(self, filepath: str) -> None:
        """Export the lineage graph to a file."""
        content = self.export()
        with open(filepath, 'w') as f:
            f.write(content)

    def to_json(self, indent: int = 2) -> str:
        """Convert export to JSON string."""
        return json.dumps(self._to_dict(), indent=indent)

    @abstractmethod
    def _to_dict(self) -> Dict[str, Any]:
        """Convert export to dictionary (for JSON serialization)."""
        pass