Simple-chatbot / app.py
beyoru's picture
Update app.py
751a8df verified
import os
import json
import torch
import requests
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread
import gradio as gr
MODEL_NAME = os.getenv('MODEL_ID')
TOKEN = os.getenv('TOKEN')
MCP_URL = "https://beyoru-clone-tools.hf.space/gradio_api/mcp/"
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True, token=TOKEN)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True,
token=TOKEN
)
print("Model loaded.")
# Define MCP tools schema
TOOLS = [
{
"type": "function",
"function": {
"name": "clone_tools_Web_Search",
"description": "Run a DuckDuckGo-backed search across text, news, images, videos, or books.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"},
"max_results": {"type": "number", "description": "Number of results to return (1-20)", "default": 5},
"search_type": {"type": "string", "enum": ["text", "news", "images", "videos", "books"], "default": "text"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "clone_tools_Web_Fetch",
"description": "Fetch a webpage and return clean Markdown, raw HTML, or a list of links.",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The absolute URL to fetch"},
"max_chars": {"type": "number", "description": "Maximum characters to return (0 = no limit)", "default": 0},
"mode": {"type": "string", "enum": ["markdown", "html", "url_scraper"], "default": "markdown"}
},
"required": ["url"]
}
}
},
{
"type": "function",
"function": {
"name": "clone_tools_Code_Interpreter",
"description": "Execute Python code and return the output.",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python source code to run"}
},
"required": ["code"]
}
}
},
{
"type": "function",
"function": {
"name": "clone_tools_Generate_Image",
"description": "Generate an image from a text prompt via Hugging Face inference.",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Text description of the image to generate"},
"model_id": {"type": "string", "default": "black-forest-labs/FLUX.1-dev"},
"steps": {"type": "number", "default": 30},
"width": {"type": "number", "default": 1024},
"height": {"type": "number", "default": 1024}
},
"required": ["prompt"]
}
}
}
]
def call_mcp_tool(tool_name, parameters, timeout=60):
"""
Call MCP tool via Streamable HTTP (SSE).
Extracts JSON responses from 'data:' events.
Returns parsed JSON dict.
"""
try:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": parameters
}
}
response = requests.post(
MCP_URL,
json=payload,
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
},
timeout=timeout,
stream=False
)
if response.status_code != 200:
return {"error": f"HTTP {response.status_code}: {response.text}"}
# Parse SSE chunks
data_events = []
for line in response.text.splitlines():
line = line.strip()
if line.startswith("data:"):
json_str = line.replace("data:", "").strip()
try:
data_events.append(json.loads(json_str))
except json.JSONDecodeError:
pass # skip invalid chunks
if not data_events:
return {"error": "No valid JSON data events found in SSE response"}
# Return the final event (most tools return a single event)
final_result = data_events[-1]
# Extract content from result
if "result" in final_result:
result = final_result["result"]
# Extract text content if available
if isinstance(result, dict) and "content" in result:
content = result["content"]
if isinstance(content, list) and len(content) > 0:
if content[0].get("type") == "text":
return {"output": content[0].get("text", "")}
return result
return final_result
except requests.exceptions.Timeout:
return {"error": "Request timeout"}
except Exception as e:
return {"error": f"MCP call failed: {str(e)}"}
def process_tool_calls(tool_calls):
"""Process tool calls and return results"""
results = []
for tool_call in tool_calls:
if isinstance(tool_call, dict):
func_name = tool_call.get("name")
func_args = tool_call.get("arguments", {})
if isinstance(func_args, str):
try:
func_args = json.loads(func_args)
except:
pass
result = call_mcp_tool(func_name, func_args)
# Format result for display
result_text = ""
if "error" in result:
result_text = f"❌ Error: {result['error']}"
elif "output" in result:
result_text = result["output"]
else:
result_text = json.dumps(result, ensure_ascii=False, indent=2)
results.append({
"tool_call_id": tool_call.get("id", "call_0"),
"role": "tool",
"name": func_name,
"content": result_text
})
return results
def playground(
message,
history,
system_prompt,
enable_tools,
max_new_tokens,
temperature,
repetition_penalty,
top_k,
top_p,
max_tool_iterations
):
if not isinstance(message, str) or not message.strip():
yield ""
return
# Build conversation
conversation = []
if system_prompt and system_prompt.strip():
conversation.append({"role": "system", "content": system_prompt.strip()})
for user_msg, bot_msg in history:
conversation.append({"role": "user", "content": user_msg})
if bot_msg:
conversation.append({"role": "assistant", "content": bot_msg})
conversation.append({"role": "user", "content": message})
# Tool calling loop
iteration = 0
generated_text = ""
while iteration < max_tool_iterations:
iteration += 1
# Apply chat template with tools if enabled
if enable_tools and hasattr(tokenizer, "apply_chat_template"):
prompt = tokenizer.apply_chat_template(
conversation,
tools=TOOLS,
tokenize=False,
add_generation_prompt=True
)
else:
prompt = tokenizer.apply_chat_template(
conversation,
tokenize=False,
add_generation_prompt=True
)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
**inputs,
streamer=streamer,
max_new_tokens=int(max_new_tokens),
temperature=float(temperature),
top_k=int(top_k) if top_k > 0 else None,
top_p=float(top_p),
repetition_penalty=float(repetition_penalty),
do_sample=True if temperature > 0 else False,
pad_token_id=tokenizer.eos_token_id
)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
current_output = ""
for new_text in streamer:
current_output += new_text
generated_text = current_output
yield generated_text
thread.join()
# Check for tool calls
tool_calls = None
try:
# Try to parse tool calls from output
if "<tool_call>" in current_output:
# Extract tool call JSON
import re
tool_match = re.search(r'<tool_call>(.*?)</tool_call>', current_output, re.DOTALL)
if tool_match:
tool_calls = json.loads(tool_match.group(1))
except:
pass
if not enable_tools or not tool_calls:
# No tool calls, return final response
break
# Process tool calls
generated_text += "\n\n🔧 **Executing tools...**\n"
yield generated_text
tool_results = process_tool_calls(tool_calls if isinstance(tool_calls, list) else [tool_calls])
# Add assistant message with tool calls
conversation.append({
"role": "assistant",
"content": current_output,
"tool_calls": tool_calls if isinstance(tool_calls, list) else [tool_calls]
})
# Add tool results
for result in tool_results:
conversation.append(result)
generated_text += f"\n✓ {result['name']}: {result['content'][:200]}...\n"
yield generated_text
generated_text += "\n**Processing results...**\n\n"
yield generated_text
# Continue conversation with tool results
# Reset generated_text for next iteration
generated_text = ""
with gr.Blocks(fill_height=True, fill_width=True) as app:
with gr.Sidebar():
gr.Markdown("## Playground with MCP Tools")
gr.HTML("""
Runs <b><a href="https://huggingface.co/beyoru/Qwen3-0.9B-A0.6B" target="_blank">
beyoru/Qwen3-0.9B-A0.6B</a></b> with <b>MCP Tools Integration</b>.<br><br>
<b>Support me at:</b><br><br>
<a href="https://www.buymeacoffee.com/ductransa0g" target="_blank">
<img src="https://cdn.buymeacoffee.com/buttons/v2/default-yellow.png" alt="Buy Me A Coffee" width="150px">
</a>
""")
gr.Markdown("---")
gr.Markdown("## Tools Settings")
enable_tools = gr.Checkbox(
label="Enable MCP Tools",
value=True,
info="Allow model to call external tools (search, code, images)"
)
max_tool_iterations = gr.Slider(
1, 5, value=3, step=1,
label="Max Tool Iterations",
info="Maximum number of tool calling rounds"
)
gr.Markdown("---")
gr.Markdown("## System Prompt")
system_prompt = gr.Textbox(
label="System Prompt",
placeholder="Enter custom system instructions...",
lines=4,
value="You are a helpful AI assistant with access to tools for web search, code execution, and image generation. Use tools when needed to provide accurate and helpful responses.",
info="AI role and behavior"
)
gr.Markdown("---")
gr.Markdown("## Generation Parameters")
max_new_tokens = gr.Slider(32, 4096, value=2048, step=32, label="Max New Tokens")
temperature = gr.Slider(0.1, 2.0, value=0.6, step=0.1, label="Temperature")
repetition_penalty = gr.Slider(0.1, 2.0, value=1.0, step=0.1, label="Repetition Penalty")
top_k = gr.Slider(0, 100, value=20, step=1, label="Top K (0 = off)")
top_p = gr.Slider(0.0, 1.0, value=0.95, step=0.05, label="Top P")
gr.ChatInterface(
fn=playground,
additional_inputs=[
system_prompt,
enable_tools,
max_new_tokens,
temperature,
repetition_penalty,
top_k,
top_p,
max_tool_iterations
],
chatbot=gr.Chatbot(
label="Qwen3-0.9B-A0.6B with MCP Tools",
show_copy_button=True,
allow_tags=["think"],
),
examples=[
["Search for the latest news about AI"],
["Calculate the fibonacci sequence up to 10 using code"],
["Generate an image of a cute robot"],
["What's the weather like today?"]
],
cache_examples=False,
show_api=False
)
app.launch(server_name="0.0.0.0", pwa=True)