tecuts commited on
Commit
b136fd6
·
verified ·
1 Parent(s): f2561e7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +69 -63
app.py CHANGED
@@ -177,14 +177,15 @@ available_tools = [
177
  }
178
  ]
179
 
 
180
  # --- Streaming Response Generator ---
181
  async def generate_streaming_response(messages: List[Dict], use_search: bool, temperature: float):
182
  """Generate streaming response with optional search"""
183
 
184
  try:
185
- # Initial LLM call with streaming
186
  llm_kwargs = {
187
- "model": "unsloth/Qwen3-30B-A3B-GGUF",
188
  "temperature": temperature,
189
  "messages": messages,
190
  "max_tokens": 2000,
@@ -195,100 +196,105 @@ async def generate_streaming_response(messages: List[Dict], use_search: bool, te
195
  llm_kwargs["tools"] = available_tools
196
  llm_kwargs["tool_choice"] = "auto"
197
 
198
- source_links = []
199
- response_content = ""
200
- tool_calls_data = []
201
-
202
- # First streaming call
203
  stream = client.chat.completions.create(**llm_kwargs)
204
 
 
 
 
 
205
  for chunk in stream:
206
  delta = chunk.choices[0].delta
207
 
208
- # Handle content streaming
209
  if delta.content:
210
  content_chunk = delta.content
211
  response_content += content_chunk
212
- yield f"data: {json.dumps({'type': 'content', 'data': content_chunk})}\n\n"
213
 
214
- # Handle tool calls
215
  if delta.tool_calls:
216
  for tool_call in delta.tool_calls:
217
  if len(tool_calls_data) <= tool_call.index:
218
- tool_calls_data.extend([{"id": "", "function": {"name": "", "arguments": ""}}
219
- for _ in range(tool_call.index + 1 - len(tool_calls_data))])
220
-
221
  if tool_call.id:
222
  tool_calls_data[tool_call.index]["id"] = tool_call.id
223
  if tool_call.function.name:
224
  tool_calls_data[tool_call.index]["function"]["name"] = tool_call.function.name
225
  if tool_call.function.arguments:
226
  tool_calls_data[tool_call.index]["function"]["arguments"] += tool_call.function.arguments
227
-
228
- # Process tool calls if any
229
- if tool_calls_data and any(tc["function"]["name"] for tc in tool_calls_data):
 
 
230
  yield f"data: {json.dumps({'type': 'status', 'data': 'Searching...'})}\n\n"
231
-
232
- # Execute searches concurrently for speed
233
- search_tasks = []
 
 
 
 
 
 
 
234
  for tool_call in tool_calls_data:
235
- if tool_call["function"]["name"] == "google_search":
236
  try:
237
  args = json.loads(tool_call["function"]["arguments"])
238
  query = args.get("query", "").strip()
239
  if query:
240
- search_tasks.append(google_search_tool_async(query))
 
241
  except json.JSONDecodeError:
242
  continue
243
 
244
- # Run searches concurrently
245
- if search_tasks:
246
- search_results_list = await asyncio.gather(*search_tasks, return_exceptions=True)
247
-
248
- # Combine all search results
249
- all_results = []
250
- for results in search_results_list:
251
- if isinstance(results, list):
252
- all_results.extend(results)
253
- for result in results:
254
- source_links.append({
255
- "title": result["source_title"],
256
- "url": result["url"],
257
- "domain": result["domain"]
258
- })
259
 
260
- # Format search results
261
- if all_results:
262
- search_context = format_search_results_compact(all_results)
263
-
264
- # Create new message with search context
265
- search_messages = messages + [{
266
- "role": "system",
267
- "content": f"{search_context}\n\nPlease provide a comprehensive response based on the search results above."
268
- }]
269
-
270
- yield f"data: {json.dumps({'type': 'status', 'data': 'Generating response...'})}\n\n"
271
-
272
- # Generate final response with search context
273
- final_stream = client.chat.completions.create(
274
- model="unsloth/Qwen3-30B-A3B-GGUF",
275
- temperature=temperature,
276
- messages=search_messages,
277
- max_tokens=2000,
278
- stream=True
279
- )
280
-
281
- for chunk in final_stream:
282
- if chunk.choices[0].delta.content:
283
- content = chunk.choices[0].delta.content
284
- yield f"data: {json.dumps({'type': 'content', 'data': content})}\n\n"
285
 
286
- # Send sources and completion
287
  if source_links:
288
  yield f"data: {json.dumps({'type': 'sources', 'data': source_links})}\n\n"
289
 
290
  yield f"data: {json.dumps({'type': 'done', 'data': {'search_used': bool(source_links)}})}\n\n"
291
-
292
  except Exception as e:
293
  logger.error(f"Streaming error: {e}")
294
  yield f"data: {json.dumps({'type': 'error', 'data': str(e)})}\n\n"
 
177
  }
178
  ]
179
 
180
+
181
  # --- Streaming Response Generator ---
182
  async def generate_streaming_response(messages: List[Dict], use_search: bool, temperature: float):
183
  """Generate streaming response with optional search"""
184
 
185
  try:
186
+ # --- Stage 1: Initial call to see if the model wants to use a tool ---
187
  llm_kwargs = {
188
+ "model": "unsloth/Qwen3-30B-A3B-GGUF",
189
  "temperature": temperature,
190
  "messages": messages,
191
  "max_tokens": 2000,
 
196
  llm_kwargs["tools"] = available_tools
197
  llm_kwargs["tool_choice"] = "auto"
198
 
 
 
 
 
 
199
  stream = client.chat.completions.create(**llm_kwargs)
200
 
201
+ response_content = ""
202
+ tool_calls_data = []
203
+
204
+ # Accumulate the response from the first stream
205
  for chunk in stream:
206
  delta = chunk.choices[0].delta
207
 
 
208
  if delta.content:
209
  content_chunk = delta.content
210
  response_content += content_chunk
211
+ # Don't yield content yet, wait to see if a tool is called
212
 
213
+ # This logic for accumulating tool calls is complex but correct
214
  if delta.tool_calls:
215
  for tool_call in delta.tool_calls:
216
  if len(tool_calls_data) <= tool_call.index:
217
+ tool_calls_data.extend([{"id": "", "function": {"name": "", "arguments": ""}} for _ in range(tool_call.index + 1 - len(tool_calls_data))])
 
 
218
  if tool_call.id:
219
  tool_calls_data[tool_call.index]["id"] = tool_call.id
220
  if tool_call.function.name:
221
  tool_calls_data[tool_call.index]["function"]["name"] = tool_call.function.name
222
  if tool_call.function.arguments:
223
  tool_calls_data[tool_call.index]["function"]["arguments"] += tool_call.function.arguments
224
+
225
+ # --- Stage 2: Decide what to do based on the model's response ---
226
+
227
+ # If the model returned tool calls, execute them
228
+ if tool_calls_data:
229
  yield f"data: {json.dumps({'type': 'status', 'data': 'Searching...'})}\n\n"
230
+
231
+ # 1. Append the assistant's request to use a tool to the message history
232
+ messages.append({
233
+ "role": "assistant",
234
+ "content": response_content or None, # Can be empty
235
+ "tool_calls": tool_calls_data
236
+ })
237
+
238
+ # Execute all tool calls concurrently
239
+ search_tasks = {}
240
  for tool_call in tool_calls_data:
241
+ if tool_call["function"]["name"] == "Google Search":
242
  try:
243
  args = json.loads(tool_call["function"]["arguments"])
244
  query = args.get("query", "").strip()
245
  if query:
246
+ # Map tool_call_id to the task
247
+ search_tasks[tool_call["id"]] = Google Search_tool_async(query)
248
  except json.JSONDecodeError:
249
  continue
250
 
251
+ search_results_by_id = await asyncio.gather(*search_tasks.values(), return_exceptions=True)
252
+ tool_ids = list(search_tasks.keys())
253
+
254
+ source_links = []
255
+
256
+ # 2. Append the results of EACH tool call to the message history
257
+ for i, results in enumerate(search_results_by_id):
258
+ tool_call_id = tool_ids[i]
259
+ if isinstance(results, list):
260
+ search_context = format_search_results_compact(results)
261
+ # Gather source links to send to the client
262
+ for result in results:
263
+ source_links.append({"title": result["source_title"], "url": result["url"], "domain": result["domain"]})
264
+ else: # Handle search error
265
+ search_context = "Error performing search."
266
 
267
+ messages.append({
268
+ "role": "tool",
269
+ "tool_call_id": tool_call_id,
270
+ "content": search_context
271
+ })
272
+
273
+ # 3. Make the SECOND call to the LLM with the complete context
274
+ yield f"data: {json.dumps({'type': 'status', 'data': 'Generating response...'})}\n\n"
275
+ final_stream = client.chat.completions.create(
276
+ model="unsloth/Qwen3-30B-A3B-GGUF",
277
+ temperature=temperature,
278
+ messages=messages, # Send the fully updated message history
279
+ max_tokens=2000,
280
+ stream=True
281
+ )
282
+
283
+ for chunk in final_stream:
284
+ if chunk.choices[0].delta.content:
285
+ content = chunk.choices[0].delta.content
286
+ yield f"data: {json.dumps({'type': 'content', 'data': content})}\n\n"
287
+
288
+ # If no tool calls were made, just stream the initial response
289
+ else:
290
+ yield f"data: {json.dumps({'type': 'content', 'data': response_content})}\n\n"
 
291
 
292
+ # --- Stage 3: Finalize the stream ---
293
  if source_links:
294
  yield f"data: {json.dumps({'type': 'sources', 'data': source_links})}\n\n"
295
 
296
  yield f"data: {json.dumps({'type': 'done', 'data': {'search_used': bool(source_links)}})}\n\n"
297
+
298
  except Exception as e:
299
  logger.error(f"Streaming error: {e}")
300
  yield f"data: {json.dumps({'type': 'error', 'data': str(e)})}\n\n"