Spaces:
Running
Running
| import os | |
| import json | |
| import torch | |
| import requests | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
| from threading import Thread | |
| import gradio as gr | |
| MODEL_NAME = os.getenv('MODEL_ID') | |
| TOKEN = os.getenv('TOKEN') | |
| MCP_URL = "https://beyoru-clone-tools.hf.space/gradio_api/mcp/" | |
| print("Loading model...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True, token=TOKEN) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| trust_remote_code=True, | |
| token=TOKEN | |
| ) | |
| print("Model loaded.") | |
| # Define MCP tools schema | |
| TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "clone_tools_Web_Search", | |
| "description": "Run a DuckDuckGo-backed search across text, news, images, videos, or books.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "The search query"}, | |
| "max_results": {"type": "number", "description": "Number of results to return (1-20)", "default": 5}, | |
| "search_type": {"type": "string", "enum": ["text", "news", "images", "videos", "books"], "default": "text"} | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "clone_tools_Web_Fetch", | |
| "description": "Fetch a webpage and return clean Markdown, raw HTML, or a list of links.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "The absolute URL to fetch"}, | |
| "max_chars": {"type": "number", "description": "Maximum characters to return (0 = no limit)", "default": 0}, | |
| "mode": {"type": "string", "enum": ["markdown", "html", "url_scraper"], "default": "markdown"} | |
| }, | |
| "required": ["url"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "clone_tools_Code_Interpreter", | |
| "description": "Execute Python code and return the output.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "code": {"type": "string", "description": "Python source code to run"} | |
| }, | |
| "required": ["code"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "clone_tools_Generate_Image", | |
| "description": "Generate an image from a text prompt via Hugging Face inference.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "prompt": {"type": "string", "description": "Text description of the image to generate"}, | |
| "model_id": {"type": "string", "default": "black-forest-labs/FLUX.1-dev"}, | |
| "steps": {"type": "number", "default": 30}, | |
| "width": {"type": "number", "default": 1024}, | |
| "height": {"type": "number", "default": 1024} | |
| }, | |
| "required": ["prompt"] | |
| } | |
| } | |
| } | |
| ] | |
| def call_mcp_tool(tool_name, parameters, timeout=60): | |
| """ | |
| Call MCP tool via Streamable HTTP (SSE). | |
| Extracts JSON responses from 'data:' events. | |
| Returns parsed JSON dict. | |
| """ | |
| try: | |
| payload = { | |
| "jsonrpc": "2.0", | |
| "id": 1, | |
| "method": "tools/call", | |
| "params": { | |
| "name": tool_name, | |
| "arguments": parameters | |
| } | |
| } | |
| response = requests.post( | |
| MCP_URL, | |
| json=payload, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Accept": "application/json, text/event-stream" | |
| }, | |
| timeout=timeout, | |
| stream=False | |
| ) | |
| if response.status_code != 200: | |
| return {"error": f"HTTP {response.status_code}: {response.text}"} | |
| # Parse SSE chunks | |
| data_events = [] | |
| for line in response.text.splitlines(): | |
| line = line.strip() | |
| if line.startswith("data:"): | |
| json_str = line.replace("data:", "").strip() | |
| try: | |
| data_events.append(json.loads(json_str)) | |
| except json.JSONDecodeError: | |
| pass # skip invalid chunks | |
| if not data_events: | |
| return {"error": "No valid JSON data events found in SSE response"} | |
| # Return the final event (most tools return a single event) | |
| final_result = data_events[-1] | |
| # Extract content from result | |
| if "result" in final_result: | |
| result = final_result["result"] | |
| # Extract text content if available | |
| if isinstance(result, dict) and "content" in result: | |
| content = result["content"] | |
| if isinstance(content, list) and len(content) > 0: | |
| if content[0].get("type") == "text": | |
| return {"output": content[0].get("text", "")} | |
| return result | |
| return final_result | |
| except requests.exceptions.Timeout: | |
| return {"error": "Request timeout"} | |
| except Exception as e: | |
| return {"error": f"MCP call failed: {str(e)}"} | |
| def process_tool_calls(tool_calls): | |
| """Process tool calls and return results""" | |
| results = [] | |
| for tool_call in tool_calls: | |
| if isinstance(tool_call, dict): | |
| func_name = tool_call.get("name") | |
| func_args = tool_call.get("arguments", {}) | |
| if isinstance(func_args, str): | |
| try: | |
| func_args = json.loads(func_args) | |
| except: | |
| pass | |
| result = call_mcp_tool(func_name, func_args) | |
| # Format result for display | |
| result_text = "" | |
| if "error" in result: | |
| result_text = f"❌ Error: {result['error']}" | |
| elif "output" in result: | |
| result_text = result["output"] | |
| else: | |
| result_text = json.dumps(result, ensure_ascii=False, indent=2) | |
| results.append({ | |
| "tool_call_id": tool_call.get("id", "call_0"), | |
| "role": "tool", | |
| "name": func_name, | |
| "content": result_text | |
| }) | |
| return results | |
| def playground( | |
| message, | |
| history, | |
| system_prompt, | |
| enable_tools, | |
| max_new_tokens, | |
| temperature, | |
| repetition_penalty, | |
| top_k, | |
| top_p, | |
| max_tool_iterations | |
| ): | |
| if not isinstance(message, str) or not message.strip(): | |
| yield "" | |
| return | |
| # Build conversation | |
| conversation = [] | |
| if system_prompt and system_prompt.strip(): | |
| conversation.append({"role": "system", "content": system_prompt.strip()}) | |
| for user_msg, bot_msg in history: | |
| conversation.append({"role": "user", "content": user_msg}) | |
| if bot_msg: | |
| conversation.append({"role": "assistant", "content": bot_msg}) | |
| conversation.append({"role": "user", "content": message}) | |
| # Tool calling loop | |
| iteration = 0 | |
| generated_text = "" | |
| while iteration < max_tool_iterations: | |
| iteration += 1 | |
| # Apply chat template with tools if enabled | |
| if enable_tools and hasattr(tokenizer, "apply_chat_template"): | |
| prompt = tokenizer.apply_chat_template( | |
| conversation, | |
| tools=TOOLS, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| else: | |
| prompt = tokenizer.apply_chat_template( | |
| conversation, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = dict( | |
| **inputs, | |
| streamer=streamer, | |
| max_new_tokens=int(max_new_tokens), | |
| temperature=float(temperature), | |
| top_k=int(top_k) if top_k > 0 else None, | |
| top_p=float(top_p), | |
| repetition_penalty=float(repetition_penalty), | |
| do_sample=True if temperature > 0 else False, | |
| pad_token_id=tokenizer.eos_token_id | |
| ) | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| current_output = "" | |
| for new_text in streamer: | |
| current_output += new_text | |
| generated_text = current_output | |
| yield generated_text | |
| thread.join() | |
| # Check for tool calls | |
| tool_calls = None | |
| try: | |
| # Try to parse tool calls from output | |
| if "<tool_call>" in current_output: | |
| # Extract tool call JSON | |
| import re | |
| tool_match = re.search(r'<tool_call>(.*?)</tool_call>', current_output, re.DOTALL) | |
| if tool_match: | |
| tool_calls = json.loads(tool_match.group(1)) | |
| except: | |
| pass | |
| if not enable_tools or not tool_calls: | |
| # No tool calls, return final response | |
| break | |
| # Process tool calls | |
| generated_text += "\n\n🔧 **Executing tools...**\n" | |
| yield generated_text | |
| tool_results = process_tool_calls(tool_calls if isinstance(tool_calls, list) else [tool_calls]) | |
| # Add assistant message with tool calls | |
| conversation.append({ | |
| "role": "assistant", | |
| "content": current_output, | |
| "tool_calls": tool_calls if isinstance(tool_calls, list) else [tool_calls] | |
| }) | |
| # Add tool results | |
| for result in tool_results: | |
| conversation.append(result) | |
| generated_text += f"\n✓ {result['name']}: {result['content'][:200]}...\n" | |
| yield generated_text | |
| generated_text += "\n**Processing results...**\n\n" | |
| yield generated_text | |
| # Continue conversation with tool results | |
| # Reset generated_text for next iteration | |
| generated_text = "" | |
| with gr.Blocks(fill_height=True, fill_width=True) as app: | |
| with gr.Sidebar(): | |
| gr.Markdown("## Playground with MCP Tools") | |
| gr.HTML(""" | |
| Runs <b><a href="https://huggingface.co/beyoru/Qwen3-0.9B-A0.6B" target="_blank"> | |
| beyoru/Qwen3-0.9B-A0.6B</a></b> with <b>MCP Tools Integration</b>.<br><br> | |
| <b>Support me at:</b><br><br> | |
| <a href="https://www.buymeacoffee.com/ductransa0g" target="_blank"> | |
| <img src="https://cdn.buymeacoffee.com/buttons/v2/default-yellow.png" alt="Buy Me A Coffee" width="150px"> | |
| </a> | |
| """) | |
| gr.Markdown("---") | |
| gr.Markdown("## Tools Settings") | |
| enable_tools = gr.Checkbox( | |
| label="Enable MCP Tools", | |
| value=True, | |
| info="Allow model to call external tools (search, code, images)" | |
| ) | |
| max_tool_iterations = gr.Slider( | |
| 1, 5, value=3, step=1, | |
| label="Max Tool Iterations", | |
| info="Maximum number of tool calling rounds" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## System Prompt") | |
| system_prompt = gr.Textbox( | |
| label="System Prompt", | |
| placeholder="Enter custom system instructions...", | |
| lines=4, | |
| value="You are a helpful AI assistant with access to tools for web search, code execution, and image generation. Use tools when needed to provide accurate and helpful responses.", | |
| info="AI role and behavior" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## Generation Parameters") | |
| max_new_tokens = gr.Slider(32, 4096, value=2048, step=32, label="Max New Tokens") | |
| temperature = gr.Slider(0.1, 2.0, value=0.6, step=0.1, label="Temperature") | |
| repetition_penalty = gr.Slider(0.1, 2.0, value=1.0, step=0.1, label="Repetition Penalty") | |
| top_k = gr.Slider(0, 100, value=20, step=1, label="Top K (0 = off)") | |
| top_p = gr.Slider(0.0, 1.0, value=0.95, step=0.05, label="Top P") | |
| gr.ChatInterface( | |
| fn=playground, | |
| additional_inputs=[ | |
| system_prompt, | |
| enable_tools, | |
| max_new_tokens, | |
| temperature, | |
| repetition_penalty, | |
| top_k, | |
| top_p, | |
| max_tool_iterations | |
| ], | |
| chatbot=gr.Chatbot( | |
| label="Qwen3-0.9B-A0.6B with MCP Tools", | |
| show_copy_button=True, | |
| allow_tags=["think"], | |
| ), | |
| examples=[ | |
| ["Search for the latest news about AI"], | |
| ["Calculate the fibonacci sequence up to 10 using code"], | |
| ["Generate an image of a cute robot"], | |
| ["What's the weather like today?"] | |
| ], | |
| cache_examples=False, | |
| show_api=False | |
| ) | |
| app.launch(server_name="0.0.0.0", pwa=True) |