|
|
from langchain_core.tools import tool |
|
|
from youtube_search import YoutubeSearch |
|
|
import requests |
|
|
import re |
|
|
import ast |
|
|
import operator |
|
|
from typing import List, Optional |
|
|
|
|
|
SAFE_OPERATORS = { |
|
|
ast.Add: operator.add, |
|
|
ast.Sub: operator.sub, |
|
|
ast.Mult: operator.mul, |
|
|
ast.Div: operator.truediv, |
|
|
ast.FloorDiv: operator.floordiv, |
|
|
ast.Mod: operator.mod, |
|
|
ast.Pow: operator.pow, |
|
|
ast.UAdd: operator.pos, |
|
|
ast.USub: operator.neg, |
|
|
} |
|
|
|
|
|
MAX_EXPONENT = 100 |
|
|
|
|
|
def safe_eval(formula: str, variables: dict) -> float: |
|
|
"""Safely evaluate a math formula with only basic arithmetic operations.""" |
|
|
try: |
|
|
tree = ast.parse(formula, mode='eval') |
|
|
except SyntaxError as e: |
|
|
raise ValueError(f"Invalid formula syntax: {e}") |
|
|
|
|
|
def _eval(node): |
|
|
if isinstance(node, ast.Expression): |
|
|
return _eval(node.body) |
|
|
|
|
|
elif isinstance(node, ast.Constant): |
|
|
if isinstance(node.value, (int, float)): |
|
|
return node.value |
|
|
raise ValueError(f"Unsupported constant type: {type(node.value).__name__}") |
|
|
|
|
|
elif isinstance(node, ast.Num): |
|
|
return node.n |
|
|
|
|
|
elif isinstance(node, ast.Name): |
|
|
if node.id in variables: |
|
|
return variables[node.id] |
|
|
raise ValueError(f"Unknown variable: {node.id}") |
|
|
|
|
|
elif isinstance(node, ast.BinOp): |
|
|
if type(node.op) not in SAFE_OPERATORS: |
|
|
raise ValueError(f"Unsupported operator: {type(node.op).__name__}") |
|
|
|
|
|
left = _eval(node.left) |
|
|
right = _eval(node.right) |
|
|
|
|
|
if isinstance(node.op, ast.Pow) and abs(right) > MAX_EXPONENT: |
|
|
raise ValueError(f"Exponent too large (max {MAX_EXPONENT})") |
|
|
|
|
|
if isinstance(node.op, (ast.Div, ast.FloorDiv, ast.Mod)) and right == 0: |
|
|
raise ValueError("Division by zero") |
|
|
|
|
|
return SAFE_OPERATORS[type(node.op)](left, right) |
|
|
|
|
|
elif isinstance(node, ast.UnaryOp): |
|
|
if type(node.op) not in SAFE_OPERATORS: |
|
|
raise ValueError(f"Unsupported operator: {type(node.op).__name__}") |
|
|
return SAFE_OPERATORS[type(node.op)](_eval(node.operand)) |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unsupported expression: {type(node).__name__}") |
|
|
|
|
|
return _eval(tree) |
|
|
|
|
|
@tool |
|
|
def hex_to_decimal(hex_value: str) -> str: |
|
|
"""Convert a hexadecimal value to decimal. |
|
|
|
|
|
Args: |
|
|
hex_value: A hexadecimal string (e.g., "1A", "FF", "0x2B", "1A F8") |
|
|
|
|
|
Returns: |
|
|
The decimal equivalent |
|
|
""" |
|
|
try: |
|
|
hex_value = hex_value.strip().upper().replace("0X", "").replace(" ", "") |
|
|
decimal_value = int(hex_value, 16) |
|
|
return f"Hex '{hex_value}' = Decimal {decimal_value}" |
|
|
except ValueError as e: |
|
|
return f"Error: Invalid hexadecimal value '{hex_value}'. {str(e)}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def combine_bytes(byte_a: str, byte_b: str, byte_c: Optional[str] = None, byte_d: Optional[str] = None) -> str: |
|
|
"""Combine 2-4 hex bytes into a larger value (big-endian, MSB first). |
|
|
|
|
|
Args: |
|
|
byte_a: First (most significant) byte as hex (e.g., "01") |
|
|
byte_b: Second byte as hex (e.g., "F4") |
|
|
byte_c: Optional third byte as hex |
|
|
byte_d: Optional fourth byte as hex |
|
|
|
|
|
Returns: |
|
|
The combined decimal value with byte breakdown |
|
|
""" |
|
|
try: |
|
|
bytes_list = [ |
|
|
byte_a.strip().upper().replace("0X", ""), |
|
|
byte_b.strip().upper().replace("0X", "") |
|
|
] |
|
|
|
|
|
if byte_c: |
|
|
bytes_list.append(byte_c.strip().upper().replace("0X", "")) |
|
|
if byte_d: |
|
|
bytes_list.append(byte_d.strip().upper().replace("0X", "")) |
|
|
|
|
|
result = 0 |
|
|
for byte_hex in bytes_list: |
|
|
result = (result << 8) + int(byte_hex, 16) |
|
|
|
|
|
byte_labels = [] |
|
|
for i, b in enumerate(bytes_list): |
|
|
label = chr(ord('A') + i) |
|
|
byte_labels.append(f"{label}=0x{b}({int(b, 16)})") |
|
|
|
|
|
return (f"Bytes: {', '.join(byte_labels)}\n" |
|
|
f"Combined: Decimal {result}") |
|
|
except ValueError as e: |
|
|
return f"Error: Invalid byte value. {str(e)}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def calculate_obd_value(formula: str, byte_a: str, byte_b: Optional[str] = None, |
|
|
byte_c: Optional[str] = None, byte_d: Optional[str] = None) -> str: |
|
|
"""Calculate an OBD-II PID value using a formula and hex byte inputs. |
|
|
|
|
|
Common formulas: |
|
|
- Engine RPM: "(A * 256 + B) / 4" |
|
|
- Coolant Temp: "A - 40" |
|
|
- Throttle Position: "(A * 100) / 255" |
|
|
- MAF Rate: "(A * 256 + B) / 100" |
|
|
- Timing Advance: "(A - 128) / 2" |
|
|
- Fuel Trim: "(A - 128) * 100 / 128" |
|
|
|
|
|
Args: |
|
|
formula: Math formula using A, B, C, D as variables |
|
|
byte_a: First byte as hex |
|
|
byte_b: Optional second byte as hex |
|
|
byte_c: Optional third byte as hex |
|
|
byte_d: Optional fourth byte as hex |
|
|
|
|
|
Returns: |
|
|
The calculated result |
|
|
""" |
|
|
try: |
|
|
A = int(byte_a.strip().upper().replace("0X", ""), 16) |
|
|
B = int(byte_b.strip().upper().replace("0X", ""), 16) if byte_b else 0 |
|
|
C = int(byte_c.strip().upper().replace("0X", ""), 16) if byte_c else 0 |
|
|
D = int(byte_d.strip().upper().replace("0X", ""), 16) if byte_d else 0 |
|
|
|
|
|
inputs = [f"A=0x{byte_a.strip().upper()}({A})"] |
|
|
if byte_b: |
|
|
inputs.append(f"B=0x{byte_b.strip().upper()}({B})") |
|
|
if byte_c: |
|
|
inputs.append(f"C=0x{byte_c.strip().upper()}({C})") |
|
|
if byte_d: |
|
|
inputs.append(f"D=0x{byte_d.strip().upper()}({D})") |
|
|
|
|
|
result = safe_eval(formula, {"A": A, "B": B, "C": C, "D": D}) |
|
|
|
|
|
if isinstance(result, float) and result == int(result): |
|
|
result_str = str(int(result)) |
|
|
elif isinstance(result, float): |
|
|
result_str = f"{result:.4f}".rstrip('0').rstrip('.') |
|
|
else: |
|
|
result_str = str(result) |
|
|
|
|
|
return (f"Formula: {formula}\n" |
|
|
f"Inputs: {', '.join(inputs)}\n" |
|
|
f"Result: {result_str}") |
|
|
except ValueError as e: |
|
|
return f"Error: {str(e)}" |
|
|
except Exception as e: |
|
|
return f"Error calculating formula: {str(e)}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def search_youtube_video(query: str) -> str: |
|
|
"""Search YouTube for a video tutorial and return the video URL. |
|
|
|
|
|
Args: |
|
|
query: Search query for YouTube (e.g., "how to fix P0103 MAF sensor") |
|
|
|
|
|
Returns: |
|
|
The URL of the first matching YouTube video |
|
|
""" |
|
|
try: |
|
|
results = YoutubeSearch(query, max_results=1).to_dict() |
|
|
if results: |
|
|
video_id = results[0]['id'] |
|
|
return f"https://www.youtube.com/watch?v={video_id}" |
|
|
return "No video found" |
|
|
except Exception as e: |
|
|
return f"Error searching YouTube: {str(e)}" |
|
|
|
|
|
|
|
|
@tool |
|
|
def decode_vin(vin: str) -> str: |
|
|
"""Decode a Vehicle Identification Number (VIN) using the NHTSA API. |
|
|
|
|
|
Args: |
|
|
vin: A 17-character VIN to decode (e.g., "5TDKRKEC7PS142916") |
|
|
|
|
|
Returns: |
|
|
A formatted string with important vehicle details like make, model, year, etc. |
|
|
""" |
|
|
|
|
|
|
|
|
vin = vin.strip().upper() |
|
|
|
|
|
if not re.match(r'^[A-HJ-NPR-Z0-9]{17}$', vin): |
|
|
return "Error: Invalid VIN format. VIN must be exactly 17 characters and contain only letters (excluding I, O, Q) and numbers." |
|
|
|
|
|
try: |
|
|
|
|
|
url = f"https://vpic.nhtsa.dot.gov/api/vehicles/decodevin/{vin}?format=json" |
|
|
response = requests.get(url, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
if data.get('Count') == 0: |
|
|
return "Error: No data returned from NHTSA API" |
|
|
|
|
|
|
|
|
results = data.get('Results', []) |
|
|
|
|
|
|
|
|
vin_data = {item['Variable']: item['Value'] for item in results if item.get('Value')} |
|
|
|
|
|
|
|
|
important_fields = [ |
|
|
'Make', |
|
|
'Model', |
|
|
'Model Year', |
|
|
'Vehicle Type', |
|
|
'Body Class', |
|
|
'Manufacturer Name', |
|
|
'Plant City', |
|
|
'Plant State', |
|
|
'Plant Country', |
|
|
'Trim', |
|
|
'Engine Number of Cylinders', |
|
|
'Displacement (L)', |
|
|
'Engine Model', |
|
|
'Fuel Type - Primary', |
|
|
'Fuel Type - Secondary', |
|
|
'Electrification Level', |
|
|
'Transmission Style', |
|
|
'Drive Type', |
|
|
'Number of Doors', |
|
|
'Number of Seats', |
|
|
'Gross Vehicle Weight Rating From', |
|
|
'Error Code', |
|
|
'Error Text' |
|
|
] |
|
|
|
|
|
|
|
|
output_lines = [f"VIN Decoded: {vin}\n"] |
|
|
|
|
|
for field in important_fields: |
|
|
value = vin_data.get(field) |
|
|
if value and value != 'null' and value != 'Not Applicable': |
|
|
output_lines.append(f"{field}: {value}") |
|
|
|
|
|
if len(output_lines) == 1: |
|
|
return "Error: No meaningful data could be extracted from the VIN" |
|
|
|
|
|
return '\n'.join(output_lines) |
|
|
|
|
|
except requests.Timeout: |
|
|
return "Error: Request to NHTSA API timed out" |
|
|
except requests.RequestException as e: |
|
|
return f"Error calling NHTSA API: {str(e)}" |
|
|
except Exception as e: |
|
|
return f"Error decoding VIN: {str(e)}" |
|
|
|