from typing import Dict from transformers import pipeline from markdown_it import MarkdownIt from smolagents.tools import Tool import torchcodec class VisitWebpageTool(Tool): name = "visit_webpage" description = ( "Visits a web page at the given url and reads its content as a markdown string and store it to a file" ) inputs = { "url": { "type": "string", "description": "The url of the webpage to visit.", }, } output_type = "string" def __init__( self, file_name: str = "web_content.md", user_agent: str = "agent-course" ): super().__init__() self.file_name = file_name self.headers = {"User-Agent": user_agent} #def _truncate_content(self, content: str, max_length: int) -> str: # if len(content) <= max_length: # return content # return ( # content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n" # ) def _inspect(self, doc: str) -> str: mdit = MarkdownIt() tokens = mdit.parse(doc) content_table = "" for token in tokens: if token.type == "heading_open": level = int(token.tag[-1]) - 1 text = token.map and tokens[tokens.index(token) + 1].content content_table += " " * level + text + "\n" return content_table def forward(self, url: str) -> str: try: import re import requests from markdownify import markdownify from requests.exceptions import RequestException except ImportError as e: raise ImportError( "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." ) from e try: # Send a GET request to the URL with a 20-second timeout response = requests.get(url, timeout=20, headers=self.headers) response.raise_for_status() # Raise an exception for bad status codes # Convert the HTML content to Markdown markdown_content = markdownify(response.text).strip() # Remove multiple line breaks markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) with open(self.file_name, "w") as f: f.write(markdown_content) try: content_summary = self._inspect(markdown_content) return f"Web page content saved in '{self.file_name}'. The content has the following section tree:\n {content_summary}. To read the full website content you can call 'read_mddoc('web_content.md')'" except Exception: return f"Web page content saved in {self.file_name}." except requests.exceptions.Timeout: return "The request timed out. Please try again later or check the URL." except RequestException as e: return f"Error fetching the webpage: {str(e)}" except Exception as e: return f"An unexpected error occurred: {str(e)}" class SpeechToTextTool(Tool): name = "transcriber" description = "This is a tool that transcribes an audio into text. It returns the transcribed text." inputs = { "audio": { "type": "audio", "description": "The audio to transcribe it should be bytes.", }, "sample_rate": { "type": "integer", "description": "The sampling rate to use to decode the audio, defaults to 16000", "nullable": True } } output_type = "string" def __init__(self, model: str = "openai/whisper-small"): super().__init__() self.pipe = pipeline("automatic-speech-recognition", model=model) def forward(self, audio: bytes, sample_rate: int=16000) -> str: sample_rate = sample_rate if sample_rate is not None else 16000 decoder = torchcodec.decoders.AudioDecoder(audio, sample_rate=sample_rate) out = self.pipe(decoder) return out["text"] class SpeechToTextTool(Tool): name = "transcriber" description = "This is a tool that transcribes an audio into text. It returns the transcribed text." inputs = { "audio_file": { "type": "string", "description": "The path to the audio file to transcribe.", }, "sample_rate": { "type": "integer", "description": "The sampling rate to use to decode the audio, defaults to 16000", "nullable": True } } output_type = "string" def __init__(self, model: str = "openai/whisper-small"): super().__init__() self.pipe = pipeline("automatic-speech-recognition", model=model) def forward(self, audio_file: str, sample_rate: int=16000) -> str: try: sample_rate = sample_rate if sample_rate is not None else 16000 with open(audio_file, "rb") as f: decoder = torchcodec.decoders.AudioDecoder(f, sample_rate=sample_rate) audio_length = decoder.get_all_samples().data.shape[1] out = self.pipe(decoder) return out["text"] except ValueError as e: max_length = 300000 suggest_sample_rate = int(sample_rate * max_length/audio_length) return f"The audio file to transcribe is too long, number of samples {audio_length}. You used a sample_rate of {sample_rate}, try using a smaller sample rate, like {suggest_sample_rate}" except Exception as e: raise e class ReadMdDoc(Tool): name = "read_mddoc" description = ( "Read an entire markdown file or a specific section of it." ) inputs = { "file_name": { "type": "string", "description": "The file to read it should have 'md' extension.", }, "section": { "type": "string", "nullable": True, "description": "If you want to read the entire file set this to 'all'. Otherwise you can look for a specific section title." }, "max_length":{ "type": "integer", "nullable": True, "description": "The maximum number of characters to return if the content has more characters it will be truncated. Use 40000 as a default." } } output_type = "string" def __init__(self): super().__init__() def _truncate_content(self, content: str, max_length: int) -> str: if len(content) <= max_length: return content return ( content[:max_length] + f"\n..._This content has been truncated to stay below {max_length} characters_...\n Does it have the information you need otherwise increase the max_length." ) def get_token_map(self, tokens): token_map = defaultdict(list) stack = [] for i, token in enumerate(tokens): if token.type == "heading_open": text = token.map and tokens[tokens.index(token) + 1].content token_map[text].append(i) level = int(token.tag[-1]) while stack and level <= stack[-1][-1]: key, _ = stack.pop() token_map[key].append(i) stack.append((text, level)) while stack: text, _ = stack.pop() token_map[text].append(i) return token_map def forward( self, file_name: str, section: str = "all", max_length: int = 40000): try: with open(file_name, "r") as f: doc = f.read() except FileNotFoundError: return f"Can't find {file_name}, are you sure the file exists and that you have spelled it crrectly?" try: mdit = MarkdownIt() tokens = mdit.parse(doc) except Exception: return "Error using the markdown parser, are you sure the file is in markdown format?" token_map = self.get_token_map(tokens) token_map["all"] = [0, len(tokens)] if section in token_map: start, end = tuple(token_map[section]) content = "\n".join([t.content for t in tokens[start:end]]) return self._truncate_content(content, max_length) else: return f"The required Section is not found in the document. The available sections are:\n {list(token_map.keys())}. If you don't see what you are looking for here, you can try returning all the document using setting argument section to 'all'"