|
|
|
|
|
""" |
|
|
Lineage Graph Extractor - Integration Example |
|
|
|
|
|
This script demonstrates how to use the Lineage Graph Extractor agent |
|
|
programmatically with the Anthropic API. |
|
|
|
|
|
Usage: |
|
|
python integration_example.py |
|
|
""" |
|
|
|
|
|
import os |
|
|
from anthropic import Anthropic |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
def load_agent_config(): |
|
|
"""Load the agent configuration from memories/agent.md""" |
|
|
config_path = os.path.join(os.path.dirname(__file__), "memories", "agent.md") |
|
|
|
|
|
with open(config_path, "r") as f: |
|
|
return f.read() |
|
|
|
|
|
def extract_lineage(client, system_prompt, user_message): |
|
|
""" |
|
|
Send a lineage extraction request to the agent. |
|
|
|
|
|
Args: |
|
|
client: Anthropic client instance |
|
|
system_prompt: Agent system prompt |
|
|
user_message: User's lineage extraction request |
|
|
|
|
|
Returns: |
|
|
Agent's response text |
|
|
""" |
|
|
response = client.messages.create( |
|
|
model="claude-3-5-sonnet-20241022", |
|
|
max_tokens=4000, |
|
|
system=system_prompt, |
|
|
messages=[{ |
|
|
"role": "user", |
|
|
"content": user_message |
|
|
}] |
|
|
) |
|
|
|
|
|
return response.content[0].text |
|
|
|
|
|
def main(): |
|
|
"""Main function demonstrating agent usage""" |
|
|
|
|
|
|
|
|
api_key = os.getenv("ANTHROPIC_API_KEY") |
|
|
if not api_key: |
|
|
print("Error: ANTHROPIC_API_KEY not found in environment variables.") |
|
|
print("Please set it in your .env file.") |
|
|
return |
|
|
|
|
|
client = Anthropic(api_key=api_key) |
|
|
|
|
|
|
|
|
print("Loading agent configuration...") |
|
|
system_prompt = load_agent_config() |
|
|
print("✓ Agent configuration loaded\n") |
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Example 1: Testing agent connection") |
|
|
print("=" * 60) |
|
|
response = extract_lineage( |
|
|
client, |
|
|
system_prompt, |
|
|
"Hello! What can you help me with?" |
|
|
) |
|
|
print(response) |
|
|
print() |
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("Example 2: Extract lineage from sample metadata") |
|
|
print("=" * 60) |
|
|
|
|
|
sample_metadata = """ |
|
|
{ |
|
|
"tables": [ |
|
|
{ |
|
|
"name": "raw_orders", |
|
|
"type": "source", |
|
|
"description": "Raw order data from API" |
|
|
}, |
|
|
{ |
|
|
"name": "raw_customers", |
|
|
"type": "source", |
|
|
"description": "Raw customer data from database" |
|
|
}, |
|
|
{ |
|
|
"name": "stg_orders", |
|
|
"type": "staging", |
|
|
"description": "Cleaned and standardized orders", |
|
|
"depends_on": ["raw_orders"] |
|
|
}, |
|
|
{ |
|
|
"name": "stg_customers", |
|
|
"type": "staging", |
|
|
"description": "Cleaned and standardized customers", |
|
|
"depends_on": ["raw_customers"] |
|
|
}, |
|
|
{ |
|
|
"name": "fct_orders", |
|
|
"type": "fact", |
|
|
"description": "Order facts with customer data", |
|
|
"depends_on": ["stg_orders", "stg_customers"] |
|
|
} |
|
|
] |
|
|
} |
|
|
""" |
|
|
|
|
|
response = extract_lineage( |
|
|
client, |
|
|
system_prompt, |
|
|
f"Extract lineage from this metadata and create a Mermaid diagram:\n\n{sample_metadata}" |
|
|
) |
|
|
print(response) |
|
|
print() |
|
|
|
|
|
|
|
|
if os.getenv("GOOGLE_CLOUD_PROJECT"): |
|
|
print("=" * 60) |
|
|
print("Example 3: BigQuery lineage extraction") |
|
|
print("=" * 60) |
|
|
|
|
|
project_id = os.getenv("GOOGLE_CLOUD_PROJECT") |
|
|
response = extract_lineage( |
|
|
client, |
|
|
system_prompt, |
|
|
f"Extract lineage from BigQuery project: {project_id}, dataset: analytics" |
|
|
) |
|
|
print(response) |
|
|
else: |
|
|
print("Skipping BigQuery example (GOOGLE_CLOUD_PROJECT not set)") |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("Examples complete!") |
|
|
print("=" * 60) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|