Spaces:
Runtime error
Runtime error
| import sys | |
| sys.path.append("../../agents") | |
| import os | |
| from gradio_base import WebUI, UIHelper, PORT, HOST, Client | |
| from gradio_config import GradioConfig as gc | |
| from typing import List, Tuple, Any | |
| import gradio as gr | |
| from Agent import Agent | |
| import time | |
| import json | |
| from utils import cos_sim | |
| from design_states import get_desgin_states, get_cot_result, gen_coder_task | |
| from gen_utils import * | |
| import openai | |
| import torch | |
| def get_embedding(sentence,api_key): | |
| openai.api_key = api_key | |
| embedding_model = openai.Embedding | |
| embed = embedding_model.create( | |
| model="text-embedding-ada-002", | |
| input=sentence | |
| ) | |
| embed = embed["data"][0]["embedding"] | |
| embed = torch.tensor(embed,dtype=torch.float32) | |
| if len(embed.shape)==1: | |
| embed = embed.unsqueeze(0) | |
| return embed | |
| class CodeUI(WebUI): | |
| def render_and_register_ui(self): | |
| self.agent_name:list = [self.cache["agents_name"]] if isinstance(self.cache["agents_name"], str) else self.cache['agents_name'] | |
| gc.add_agent(self.agent_name) | |
| def __init__( | |
| self, | |
| client_cmd: list, | |
| socket_host: str = HOST, | |
| socket_port: int = PORT, | |
| bufsize: int = 1024, | |
| ui_name: str = "CodeUI" | |
| ): | |
| super(CodeUI, self).__init__(client_cmd, socket_host, socket_port, bufsize, ui_name) | |
| self.first_recieve_from_client() | |
| self.data_history = list() | |
| self.caller = 0 | |
| def load_sop_fn(self,sop): | |
| return sop.name | |
| def generate_sop_stage_1(self,api_key,proxy,target): | |
| os.environ["API_KEY"] = api_key | |
| # os.environ["PROXY"] = proxy | |
| self.software = "You are a software,aim to write a snake game with python" | |
| self.debate = "Simulate a debate competition" | |
| self.ecological_environment = "Simulate the interactions and competition among different organisms within an ecosystem" | |
| self.software = get_embedding(self.software,api_key) | |
| self.debate = get_embedding(self.debate,api_key) | |
| self.ecological_environment = get_embedding(self.ecological_environment,api_key) | |
| self.embeddings = torch.cat([self.software,self.debate,self.ecological_environment],dim = 0) | |
| self.SOP["config"]["API_KEY"] = api_key | |
| # self.SOP["config"]["PROXY"] = proxy | |
| target_tensor = get_embedding(target,api_key) | |
| sim_scores = cos_sim(target_tensor, self.embeddings)[0] | |
| top_k_score, top_k_idx = torch.topk(sim_scores,k = 1) | |
| if top_k_score > 0.7: | |
| self.index = top_k_idx | |
| else: | |
| self.index = 0 | |
| target_processed = get_cot_result(target) | |
| print("finished!!!!") | |
| return target_processed,self.target_finish_flag.update(visible = True) | |
| def generate_sop_stage_2(self,target): | |
| design_states = get_desgin_states(target,self.index) | |
| root = design_states[0]["state_name"] | |
| self.SOP["root"] = root | |
| return design_states,self.state_finish_flag.update(visible = True) | |
| def generate_sop_stage_3(self,design_states): | |
| agents = get_agents(design_states,self.index) | |
| relations = get_relations(design_states) | |
| self.SOP["relations"] = relations | |
| self.SOP["agents"] = agents | |
| return agents, self.agent_relation_finish_flag.update(visible = True),self.reminder.update(visible = True) | |
| def generate_sop_stage_4(self,agents, need_coder,design_states): | |
| states = gen_states(design_states,self.index) | |
| if "Coder" in need_coder: | |
| agents["coder"] = {"style": "professional", "roles": {}} | |
| for state_name, state in states.items(): | |
| if state_name != "end_state": | |
| agents["coder"]["roles"][state_name] = "coder" | |
| state["roles"].append("coder") | |
| task = gen_coder_task(state["environment_prompt"]) | |
| now_coder = self.coder.copy() | |
| now_coder["task"]["task"] = task | |
| state["agent_states"]["coder"] = now_coder | |
| state["controller"]["max_chat_nums"] = str( | |
| int(state["controller"]["max_chat_nums"])+2) | |
| for name, agent in state["agent_states"].items(): | |
| if name != "coder": | |
| agent["rule"]["rule"] += "\nEvaluate the code of the coder and provide feedback and response as concise as possible.It is best not to exceed 100 words" | |
| agent["task"]["task"] += "\nEvaluate the code of the coder and provide feedback." | |
| self.SOP["states"] = states | |
| # 将字典写入JSON文件 | |
| file_name = 'generated_sop.json' | |
| with open(file_name, "w") as json_file: | |
| json.dump(self.SOP, json_file ,indent=4,ensure_ascii=False) | |
| return file_name | |
| def construct_ui(self): | |
| with gr.Blocks(css=gc.CSS) as demo: | |
| with gr.Tab(label="SOP generation") as tab1: | |
| self.coder = { | |
| "task": { | |
| "task": "" | |
| }, | |
| "rule": {"rule": "1.write code that conforms to standards like PEP8, is modular, easy to read, and maintainable.\n 2.The output strictly follows the following format:<title>{the file name}</title>\n<python>{the target code}</python>\n3.Please carefully modify the code based on feedback from others.\n4.Output the code only."}, | |
| "last": { | |
| "last_prompt": "The output strictly follows the following format:<title>{the file name}</title>\n<python>{the target code}</python>,Output the code only." | |
| } | |
| } | |
| self.SOP = { | |
| "config": { | |
| "API_KEY": "sk-*********", | |
| "MAX_CHAT_HISTORY": "5", | |
| "User_Names": '["User"]', | |
| }, | |
| "root": "state1", | |
| "relations": { | |
| "state1": {"0": "state1", "1": "state2"}, | |
| "state2": {"0": "state2", "1": "end_state"}, | |
| }, | |
| "agents": None, | |
| "states": None, | |
| } | |
| gr.Markdown("""# Generate Agent""") | |
| with gr.Row(): | |
| api_key = gr.Textbox(label="api_key") | |
| proxy = gr.Textbox(label="proxy",visible=False) | |
| with gr.Row(): | |
| requirement = gr.Textbox(value ="A software company aim to write a mine sweeping game",label="requirement") | |
| with gr.Row(): | |
| need_coder = gr.CheckboxGroup(["Coder"],label="Please check this option if your multi-agent system aims to produce code.") | |
| with gr.Row(): | |
| self.target_finish_flag = gr.Label(value = "The process of completing requirement handling is finished.",visible=False) | |
| with gr.Row(): | |
| self.state_finish_flag = gr.Label(value = "The process of determining the state is completed.",visible=False) | |
| with gr.Row(): | |
| self.agent_relation_finish_flag = gr.Label(value = "The process of initializing the agent and relation is completed.",visible=False) | |
| with gr.Row(): | |
| self.reminder = gr.Markdown("""Generating SOP...""",visible=False) | |
| generated_sop = gr.File(label="generated_file") | |
| generate_button = gr.Button(label="Generate") | |
| target_processed = gr.State() | |
| design_states = gr.State() | |
| agents = gr.State() | |
| generate_button.click(self.generate_sop_stage_1,[api_key,proxy,requirement],[target_processed,self.target_finish_flag]).then( | |
| self.generate_sop_stage_2, [target_processed], [design_states,self.state_finish_flag]).then( | |
| self.generate_sop_stage_3, [design_states], [agents,self.agent_relation_finish_flag,self.reminder]).then( | |
| self.generate_sop_stage_4, [agents, need_coder,design_states], [generated_sop]) | |
| with gr.Tab(label="Chat") as tab2: | |
| uploaded_sop = gr.State() | |
| with gr.Row(): | |
| sop = gr.File(label="upload your custmized SOP") | |
| load_sop_btn = gr.Button(value="Load SOP") | |
| load_sop_btn.click(self.load_sop_fn, sop,uploaded_sop) | |
| with gr.Row(): | |
| with gr.Column(): | |
| self.text_api = gr.Textbox( | |
| value = self.cache["api_key"], | |
| placeholder="openai key", | |
| label="Please input valid openai key for gpt-3.5-turbo-16k." | |
| ) | |
| self.radio_mode = gr.Radio( | |
| [Client.SINGLE_MODE], | |
| value=Client.SINGLE_MODE, | |
| interactive=True, | |
| label = Client.MODE_LABEL, | |
| info = Client.MODE_INFO | |
| ) | |
| self.chatbot = gr.Chatbot( | |
| elem_id="chatbot1" | |
| ) | |
| self.btn_next = gr.Button( | |
| value="Next Agent", | |
| visible=False, elem_id="btn" | |
| ) | |
| with gr.Row(): | |
| self.text_requirement = gr.Textbox( | |
| value=self.cache['requirement'], | |
| placeholder="Please enter your content", | |
| scale=9, | |
| ) | |
| self.btn_start = gr.Button( | |
| value="Start!", | |
| scale=1 | |
| ) | |
| self.btn_reset = gr.Button( | |
| value="Restart", | |
| visible=False | |
| ) | |
| with gr.Column(): | |
| self.file = gr.File(visible=False) | |
| self.chat_code_show = gr.Chatbot( | |
| elem_id="chatbot1", | |
| visible=False | |
| ) | |
| self.btn_start.click( | |
| fn=self.btn_send_when_click, | |
| inputs=[self.chatbot, self.text_requirement, self.radio_mode, self.text_api,uploaded_sop], | |
| outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
| ).then( | |
| fn=self.btn_send_after_click, | |
| inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| ) | |
| self.text_requirement.submit( | |
| fn=self.btn_send_when_click, | |
| inputs=[self.chatbot, self.text_requirement, self.text_api,uploaded_sop], | |
| outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
| ).then( | |
| fn=self.btn_send_after_click, | |
| inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| ) | |
| self.btn_reset.click( | |
| fn=self.btn_reset_when_click, | |
| inputs=[], | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| ).then( | |
| fn=self.btn_reset_after_click, | |
| inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| ) | |
| self.file.select( | |
| fn=self.file_when_select, | |
| inputs=[self.file], | |
| outputs=[self.chat_code_show] | |
| ) | |
| self.btn_next.click( | |
| fn = self.btn_next_when_click, | |
| inputs=[], | |
| outputs=[self.btn_next] | |
| ).then( | |
| fn=self.btn_send_after_click, | |
| inputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement], | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| ) | |
| self.demo = demo | |
| def handle_message(self, history:list, state, agent_name, token, node_name): | |
| if state % 10 == 0: | |
| self.data_history.append({agent_name: token}) | |
| elif state % 10 == 1: | |
| # Same state. Need to add new bubble in same bubble. | |
| self.data_history[-1][agent_name] += token | |
| elif state % 10 == 2: | |
| # New state. Need to add new bubble. | |
| history.append([None, ""]) | |
| self.data_history.clear() | |
| self.data_history.append({agent_name: token}) | |
| else: | |
| assert False, "Invalid state." | |
| render_data = self.render_bubble(history, self.data_history, node_name, render_node_name=True) | |
| return render_data | |
| def btn_send_when_click(self, chatbot, text_requirement, mode, api, sop): | |
| """ | |
| inputs=[self.chatbot, self.text_requirement, radio, text_api], | |
| outputs=[self.chatbot, self.btn_start, self.text_requirement, self.btn_reset] | |
| """ | |
| chatbot = [[UIHelper.wrap_css(content=text_requirement, name="User"), None]] | |
| yield chatbot,\ | |
| gr.Button.update(visible=True, interactive=False, value="Running"),\ | |
| gr.Textbox.update(visible=True, interactive=False, value=""),\ | |
| gr.Button.update(visible=False, interactive=False) | |
| self.send_start_cmd({'requirement': text_requirement, "mode": mode, "api_key": api ,"uploaded_sop": sop}) | |
| agents,roles_to_names,names_to_roles = Agent.from_config(str(sop)) | |
| agents_name = [] | |
| for i in names_to_roles : | |
| for j in names_to_roles[i]: | |
| agents_name.append(j+"("+names_to_roles[i][j]+")") | |
| self.new_render_and_register_ui(agents_name) | |
| return | |
| def new_render_and_register_ui(self,agent_names): | |
| gc.add_agent(agent_names, 0) | |
| def btn_send_after_click( | |
| self, | |
| file, | |
| history, | |
| show_code, | |
| btn_send, | |
| btn_reset, | |
| text_requirement | |
| ): | |
| """ | |
| outputs=[self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| """ | |
| if self.caller == 0: | |
| self.data_history = list() | |
| self.caller = 0 | |
| receive_server = self.receive_server | |
| while True: | |
| data_list: List = receive_server.send(None) | |
| for item in data_list: | |
| data = eval(item) | |
| assert isinstance(data, list) | |
| state, agent_name, token, node_name = data | |
| assert isinstance(state, int) | |
| assert state in [10, 11, 12, 99, 98] | |
| if state == 99: | |
| # finish | |
| fs = [self.cache['pwd']+'/output_code/'+_ for _ in os.listdir(self.cache['pwd']+'/output_code')] | |
| yield gr.File.update(value=fs, visible=True, interactive=True),\ | |
| history, \ | |
| gr.Chatbot.update(visible=True),\ | |
| gr.Button.update(visible=True, interactive=True, value="Start"),\ | |
| gr.Button.update(visible=True, interactive=True),\ | |
| gr.Textbox.update(visible=True, interactive=True, placeholder="Please input your requirement", value=""),\ | |
| gr.Button.update(visible=False) | |
| return | |
| elif state == 98: | |
| yield gr.File.update(visible=False),\ | |
| history, \ | |
| gr.Chatbot.update(visible=False),\ | |
| gr.Button.update(visible=True, interactive=False),\ | |
| gr.Button.update(visible=True, interactive=True),\ | |
| gr.Textbox.update(visible=True, interactive=False),\ | |
| gr.Button.update(visible=True, value=f"Next Agent: 🤖{agent_name} | Next Node: ⭕{node_name}") | |
| return | |
| history = self.handle_message(history, state, agent_name, token, node_name) | |
| yield gr.File.update(visible=False),\ | |
| history, \ | |
| gr.Chatbot.update(visible=False),\ | |
| gr.Button.update(visible=True, interactive=False),\ | |
| gr.Button.update(visible=False, interactive=False),\ | |
| gr.Textbox.update(visible=True, interactive=False),\ | |
| gr.Button.update(visible=False) | |
| def btn_reset_when_click(self): | |
| """ | |
| inputs = [] | |
| outputs = [self.file, self.chatbot, self.chat_code_show, self.btn_start, self.btn_reset, self.text_requirement, self.btn_next] | |
| """ | |
| return gr.File.update(visible=False),\ | |
| None, None, gr.Button.update(value="Restarting...", interactive=False),\ | |
| gr.Button.update(value="Restarting...", interactive=False),\ | |
| gr.Textbox.update(value="Restarting", interactive=False),\ | |
| gr.Button.update(visible=False) | |
| def btn_reset_after_click( | |
| self, | |
| file, | |
| chatbot, | |
| show_code, | |
| btn_send, | |
| btn_reset, | |
| text_requirement | |
| ): | |
| self.reset() | |
| self.first_recieve_from_client(reset_mode=True) | |
| return gr.File.update(value=None, visible=False),\ | |
| gr.Chatbot.update(value=None, visible=True),\ | |
| gr.Chatbot.update(value=None, visible=False),\ | |
| gr.Button.update(value="Start", visible=True, interactive=True),\ | |
| gr.Button.update(value="Restart", interactive=False, visible=False),\ | |
| gr.Textbox.update(value=self.cache['requirement'], interactive=True, visible=True),\ | |
| gr.Button.update(visible=False) | |
| def file_when_select(self, file): | |
| CODE_PREFIX = "```python\n{}\n```" | |
| with open(file.name, "r", encoding='utf-8') as f: | |
| contents = f.readlines() | |
| codes = "".join(contents) | |
| return [[CODE_PREFIX.format(codes),None]] | |
| def btn_next_when_click(self): | |
| self.caller = 1 # it will remain the value in self.data_history | |
| self.send_message("nothing") | |
| time.sleep(0.5) | |
| yield gr.Button.update(visible=False) | |
| return | |
| if __name__ == '__main__': | |
| ui = CodeUI(client_cmd=["python3","gradio_backend.py"]) | |
| ui.construct_ui() | |
| ui.run(share=True) | |