Spaces:
Running
on
Zero
Running
on
Zero
| import json | |
| import string | |
| import uuid | |
| import os | |
| import logging | |
| import zipfile | |
| import importlib | |
| import wandb | |
| from contextlib import redirect_stdout, redirect_stderr | |
| import spaces | |
| USE_WANDB = "WANDB_API_KEY" in os.environ | |
| if USE_WANDB: | |
| wandb.login(key=os.environ["WANDB_API_KEY"]) | |
| else: | |
| print("Warning: WANDB_API_KEY not set. Skipping wandb logging.") | |
| import gradio as gr | |
| import pandas as pd | |
| import time | |
| import sys | |
| from datetime import datetime | |
| import re | |
| # --- Configuration --- | |
| DEFAULT_MATERIALS_CSV = "default_materials.csv" | |
| GRADIO_OUTPUT_BASE_DIR = "output" | |
| os.makedirs(GRADIO_OUTPUT_BASE_DIR, exist_ok=True) | |
| REQUIRED_SCRIPT_COLS = ["Brand", " Name", " TD", " Color"] | |
| DISPLAY_COL_MAP = { | |
| "Brand": "Brand", | |
| " Name": "Name", | |
| " TD": "TD", | |
| " Color": "Color (Hex)", | |
| } | |
| def exc_text(exc: BaseException) -> str: | |
| txt = str(exc).strip() | |
| if txt: | |
| return txt | |
| if exc.args: | |
| return " ".join(str(a) for a in exc.args).strip() | |
| return exc.__class__.__name__ | |
| def ensure_required_cols(df, *, in_display_space): | |
| target_cols = ( | |
| DISPLAY_COL_MAP if in_display_space else {k: k for k in REQUIRED_SCRIPT_COLS} | |
| ) | |
| df_fixed = df.copy() | |
| for col_script, col_display in target_cols.items(): | |
| if col_display not in df_fixed.columns: | |
| if "TD" in col_display: | |
| default = 0.0 | |
| elif "Color" in col_display: | |
| default = "#000000" | |
| elif "Owned" in col_display: | |
| default = "false" | |
| else: | |
| default = "" | |
| df_fixed[col_display] = default | |
| return df_fixed[list(target_cols.values())] | |
| def rgba_to_hex(col: str) -> str: | |
| if not isinstance(col, str): | |
| return col | |
| col = col.strip() | |
| if col.startswith("#"): | |
| return col.upper() | |
| m = re.match( | |
| r"rgba?\(\s*([\d.]+)\s*,\s*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*[\d.]+)?\s*\)", | |
| col, | |
| ) | |
| if not m: | |
| return col | |
| r, g, b = (int(float(x)) for x in m.groups()[:3]) | |
| return "#{:02X}{:02X}{:02X}".format(r, g, b) | |
| def zip_dir_no_compress(src_dir: str, dest_zip: str) -> str: | |
| t0 = time.time() | |
| with zipfile.ZipFile(dest_zip, "w", | |
| compression=zipfile.ZIP_STORED, | |
| allowZip64=True) as zf: | |
| for root, _, files in os.walk(src_dir): | |
| for fname in files: | |
| fpath = os.path.join(root, fname) | |
| zf.write(fpath, os.path.relpath(fpath, src_dir)) | |
| print(f"Zipping finished in {time.time() - t0:.1f}s") | |
| return dest_zip | |
| def get_script_args_info(exclude_args=None): | |
| if exclude_args is None: | |
| exclude_args = [] | |
| all_args_info = [ | |
| { | |
| "name": "--iterations", | |
| "type": "number", | |
| "default": 4000, | |
| "help": "Number of optimization iterations", | |
| }, | |
| { | |
| "name": "--layer_height", | |
| "type": "number", | |
| "default": 0.04, | |
| "step": 0.01, | |
| "help": "Layer thickness in mm", | |
| }, | |
| { | |
| "name": "--max_layers", | |
| "type": "number", | |
| "default": 75, | |
| "precision": 0, | |
| "help": "Maximum number of layers", | |
| }, | |
| { | |
| "name": "--learning_rate", | |
| "type": "number", | |
| "default": 0.015, | |
| "step": 0.001, | |
| "help": "Learning rate for optimization", | |
| }, | |
| { | |
| "name": "--background_height", | |
| "type": "number", | |
| "default": 0.4, | |
| "step": 0.01, | |
| "help": "Height of the background in mm", | |
| }, | |
| { | |
| "name": "--background_color", | |
| "type": "colorpicker", | |
| "default": "#000000", | |
| "help": "Background color", | |
| }, | |
| { | |
| "name": "--stl_output_size", | |
| "type": "number", | |
| "default": 100, | |
| "precision": 0, | |
| "help": "Size of the longest dimension of the output STL file in mm", | |
| }, | |
| { | |
| "name": "--nozzle_diameter", | |
| "type": "number", | |
| "default": 0.4, | |
| "step": 0.1, | |
| "help": "Diameter of the printer nozzle in mm", | |
| }, | |
| { | |
| "name": "--pruning_max_colors", | |
| "type": "number", | |
| "default": 100, | |
| "precision": 0, | |
| "help": "Max number of colors allowed after pruning", | |
| }, | |
| { | |
| "name": "--pruning_max_swaps", | |
| "type": "number", | |
| "default": 50, | |
| "precision": 0, | |
| "help": "Max number of swaps allowed after pruning", | |
| }, | |
| { | |
| "name": "--pruning_max_layer", | |
| "type": "number", | |
| "default": 75, | |
| "precision": 0, | |
| "help": "Max number of layers allowed after pruning", | |
| }, | |
| { | |
| "name": "--warmup_fraction", | |
| "type": "slider", | |
| "default": 1.0, | |
| "min": 0.0, | |
| "max": 1.0, | |
| "step": 0.01, | |
| "help": "Fraction of iterations for keeping the tau at the initial value", | |
| }, | |
| { | |
| "name": "--learning_rate_warmup_fraction", | |
| "type": "slider", | |
| "default": 0.01, | |
| "min": 0.0, | |
| "max": 1.0, | |
| "step": 0.01, | |
| "help": "Fraction of iterations that the learning rate is increasing (warmup)", | |
| }, | |
| { | |
| "name": "--early_stopping", | |
| "type": "number", | |
| "default": 5000, | |
| "precision": 0, | |
| "help": "Number of steps without improvement before stopping", | |
| }, | |
| { | |
| "name": "--fast_pruning_percent", | |
| "type": "slider", | |
| "default": 0.05, | |
| "min": 0.0, | |
| "max": 1.0, | |
| "step": 0.01, | |
| "help": "Percentage of increment search for fast pruning.", | |
| }, | |
| { | |
| "name": "--random_seed", | |
| "type": "number", | |
| "default": 0, | |
| "precision": 0, | |
| "help": "Specify the random seed, or use 0 for automatic generation", | |
| }, | |
| { | |
| "name": "--num_init_rounds", | |
| "type": "number", | |
| "default": 8, | |
| "precision": 0, | |
| "help": "Number of rounds to choose the starting height map from.", | |
| }, | |
| ] | |
| return [arg for arg in all_args_info if arg["name"] not in exclude_args] | |
| # initial data that will be used if no CSV exists | |
| initial_filament_data = { | |
| "Brand": ["Generic", "Generic", "Generic", "Generic", "Generic", "Generic"], | |
| " Name": ["PLA Black", "PLA Grey", "PLA White", "PLA Red", "PLA Green", "PLA Blue"], | |
| " TD": [5.0, 5.0, 5.0, 5.0, 5.0], | |
| " Color": ["#000000", "#808080", "#FFFFFF", "#FF0000", "#00FF00", "#0000FF"], | |
| " Owned": ["true", "true", "true", "true", "true", "true"], | |
| } | |
| def normalize_filament_df(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df.columns = [c.strip() for c in df.columns] | |
| rename_map = { | |
| "Name": " Name", | |
| "TD": " TD", | |
| "Color": " Color", | |
| "Owned": " Owned", | |
| } | |
| for src, dst in rename_map.items(): | |
| if src in df.columns and dst not in df.columns: | |
| df.rename(columns={src: dst}, inplace=True) | |
| if " TD" in df.columns: | |
| df[" TD"] = pd.to_numeric(df[" TD"], errors="coerce").fillna(0.0) | |
| else: | |
| df[" TD"] = 0.0 | |
| if " Color" in df.columns: | |
| df[" Color"] = df[" Color"].astype(str) | |
| else: | |
| df[" Color"] = "#000000" | |
| if " Owned" not in df.columns: | |
| df[" Owned"] = "false" | |
| else: | |
| df[" Owned"] = df[" Owned"].astype(str) | |
| if "Brand" not in df.columns: | |
| df["Brand"] = "" | |
| ordered_cols = ["Brand", " Name", " TD", " Color", " Owned"] | |
| df = df[[c for c in ordered_cols if c in df.columns]] | |
| return df | |
| # load CSV if present | |
| if os.path.exists(DEFAULT_MATERIALS_CSV): | |
| try: | |
| loaded_df = pd.read_csv(DEFAULT_MATERIALS_CSV, index_col=False) | |
| loaded_df = normalize_filament_df(loaded_df) | |
| initial_df = loaded_df.copy() | |
| initial_filament_data = { | |
| "Brand": initial_df["Brand"].tolist(), | |
| " Name": initial_df[" Name"].tolist(), | |
| " TD": initial_df[" TD"].tolist(), | |
| " Color": initial_df[" Color"].tolist(), | |
| } | |
| if " Owned" in initial_df.columns: | |
| initial_filament_data[" Owned"] = initial_df[" Owned"].astype(str).tolist() | |
| else: | |
| initial_filament_data[" Owned"] = ["false"] * len(initial_df) | |
| except Exception as e: | |
| print(f"Warning: Could not load {DEFAULT_MATERIALS_CSV}: {e}. Using default.") | |
| initial_df = pd.DataFrame(initial_filament_data) | |
| else: | |
| initial_df = pd.DataFrame(initial_filament_data) | |
| initial_df.to_csv(DEFAULT_MATERIALS_CSV, index=False) | |
| def run_autoforge_process(cmd, log_path): | |
| from joblib import parallel_backend | |
| cli_args = cmd[1:] | |
| autoforge_main = importlib.import_module("autoforge.__main__") | |
| exit_code = 0 | |
| with open(log_path, "w", buffering=1, encoding="utf-8") as log_f, \ | |
| redirect_stdout(log_f), redirect_stderr(log_f), parallel_backend("threading", n_jobs=-1): | |
| try: | |
| sys.argv = ["autoforge"] + cli_args | |
| autoforge_main.main() | |
| except SystemExit as e: | |
| exit_code = e.code | |
| except Exception as e: | |
| log_f.write(f"\nERROR: {e}\n") | |
| exit_code = -1 | |
| return exit_code | |
| def create_empty_error_outputs(log_message=""): | |
| return ( | |
| log_message, | |
| None, | |
| gr.update(visible=False, interactive=False), | |
| ) | |
| def load_filaments_from_json_upload(file_obj): | |
| if file_obj is None: | |
| current_script_df = filament_df_state.value | |
| if current_script_df is not None and not current_script_df.empty: | |
| return current_script_df.rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| return initial_df.copy().rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| try: | |
| with open(file_obj.name, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, dict) and "Filaments" in data: | |
| data = data["Filaments"] | |
| df_loaded = pd.DataFrame(data) | |
| df_loaded.columns = [c.strip() for c in df_loaded.columns] | |
| rename_map = { | |
| "Name": " Name", | |
| "Transmissivity": " TD", | |
| "Color": " Color", | |
| "Owned": " Owned", | |
| } | |
| df_loaded.rename( | |
| columns={k: v for k, v in rename_map.items() if k in df_loaded.columns}, | |
| inplace=True, | |
| ) | |
| df_loaded = normalize_filament_df(df_loaded) | |
| filament_df_state.value = df_loaded.copy() | |
| return df_loaded.rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| except Exception as e: | |
| gr.Error(f"Error loading JSON: {e}") | |
| return filament_table.value | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# [Autoforge](https://github.com/hvoss-techfak/AutoForge) Web UI") | |
| filament_df_state = gr.State(initial_df.copy()) | |
| current_run_output_dir = gr.State(None) | |
| with gr.Tabs(): | |
| with gr.TabItem("Filament Management"): | |
| gr.Markdown( | |
| 'Manage your filament list here. This list will be used by Autoforge during the optimization process.' | |
| ) | |
| gr.Markdown( | |
| 'If you have Hueforge, you can export your filaments under "Filaments -> Export" in the Hueforge software. Please make sure to select "CSV" instead of "JSON" during the export dialog.' | |
| ) | |
| gr.Markdown( | |
| 'If you want to load your personal library of Hueforge filaments, you can also simply paste this path into your explorer address bar: %APPDATA%\\HueForge\\Filaments\\ and import your "personal_library.json" using the "Load Filaments Json" button.' | |
| ) | |
| gr.Markdown( | |
| 'To remove a filament simply right-click on any of the fields and select "Delete Row"' | |
| ) | |
| gr.Markdown( | |
| 'Hint: If you have an AMS 3d printer try giving it your entire filament library and then set "pruning_max_colors" under "Autoforge Parameters" in the second tab to your number of AMS slots.' | |
| ' Autoforge will automatically select the best matching colors for your image.' | |
| ) | |
| with gr.Row(): | |
| load_csv_button = gr.UploadButton( | |
| "Load Filaments CSV", file_types=[".csv"] | |
| ) | |
| load_json_button = gr.UploadButton( | |
| "Load Filaments JSON", file_types=[".json"] | |
| ) | |
| save_csv_button = gr.Button("Save Current Filaments to CSV") | |
| filament_table = gr.DataFrame( | |
| value=ensure_required_cols( | |
| initial_df.copy().rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ), | |
| in_display_space=True, | |
| ), | |
| headers=["Brand", "Name", "TD", "Color (Hex)"], | |
| datatype=["str", "str", "number", "str"], | |
| interactive=True, | |
| label="Filaments", | |
| ) | |
| gr.Markdown("## Add New Filament") | |
| with gr.Row(): | |
| new_brand = gr.Textbox(label="Brand") | |
| new_name = gr.Textbox(label="Name") | |
| with gr.Row(): | |
| new_td = gr.Number( | |
| label="TD (Transmission/Opacity)", | |
| value=1.0, | |
| minimum=0, | |
| maximum=100, | |
| step=0.1, | |
| ) | |
| new_color_hex = gr.ColorPicker(label="Color", value="#FF0000") | |
| add_filament_button = gr.Button("Add Filament to Table") | |
| download_csv_trigger = gr.File( | |
| label="Download Filament CSV", visible=False, interactive=False | |
| ) | |
| def update_filament_df_state_from_table(display_df): | |
| display_df = ensure_required_cols(display_df, in_display_space=True) | |
| if "Color (Hex)" in display_df.columns: | |
| display_df["Color (Hex)"] = display_df["Color (Hex)"].apply( | |
| rgba_to_hex | |
| ) | |
| script_df = display_df.rename( | |
| columns={"Name": " Name", "TD": " TD", "Color (Hex)": " Color"} | |
| ) | |
| script_df = ensure_required_cols(script_df, in_display_space=False) | |
| filament_df_state.value = script_df | |
| def add_filament_to_table(current_display_df, brand, name, td, color_hex): | |
| if not brand or not name: | |
| gr.Warning("Brand and Name cannot be empty.") | |
| return current_display_df | |
| color_hex = rgba_to_hex(color_hex) | |
| new_row = pd.DataFrame( | |
| [{"Brand": brand, "Name": name, "TD": td, "Color (Hex)": color_hex}] | |
| ) | |
| updated_display_df = pd.concat( | |
| [current_display_df, new_row], ignore_index=True | |
| ) | |
| update_filament_df_state_from_table(updated_display_df) | |
| return updated_display_df | |
| def load_filaments_from_csv_upload(file_obj): | |
| if file_obj is None: | |
| current_script_df = filament_df_state.value | |
| if current_script_df is not None and not current_script_df.empty: | |
| return current_script_df.rename( | |
| columns={ | |
| " Name": "Name", | |
| " TD": "TD", | |
| " Color": "Color (Hex)", | |
| } | |
| ) | |
| return initial_df.copy().rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| try: | |
| loaded_script_df = pd.read_csv(file_obj.name, index_col=False) | |
| loaded_script_df = normalize_filament_df(loaded_script_df) | |
| expected_cols = ["Brand", " Name", " TD", " Color"] | |
| if not all(col in loaded_script_df.columns for col in expected_cols): | |
| gr.Error( | |
| f"CSV must contain columns: {', '.join(expected_cols)}. Found: {loaded_script_df.columns.tolist()}" | |
| ) | |
| current_script_df = filament_df_state.value | |
| if ( | |
| current_script_df is not None | |
| and not current_script_df.empty | |
| ): | |
| return current_script_df.rename( | |
| columns={ | |
| " Name": "Name", | |
| " TD": "TD", | |
| " Color": "Color (Hex)", | |
| } | |
| ) | |
| return initial_df.copy().rename( | |
| columns={ | |
| " Name": "Name", | |
| " TD": "TD", | |
| " Color": "Color (Hex)", | |
| } | |
| ) | |
| filament_df_state.value = loaded_script_df.copy() | |
| return loaded_script_df.rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| except Exception as e: | |
| gr.Error(f"Error loading CSV: {e}") | |
| current_script_df = filament_df_state.value | |
| if current_script_df is not None and not current_script_df.empty: | |
| return current_script_df.rename( | |
| columns={ | |
| " Name": "Name", | |
| " TD": "TD", | |
| " Color": "Color (Hex)", | |
| } | |
| ) | |
| return initial_df.copy().rename( | |
| columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"} | |
| ) | |
| def save_filaments_to_file_for_download(current_script_df_from_state): | |
| if ( | |
| current_script_df_from_state is None | |
| or current_script_df_from_state.empty | |
| ): | |
| gr.Warning("Filament table is empty. Nothing to save.") | |
| return None | |
| df_to_save = current_script_df_from_state.copy() | |
| required_cols = ["Brand", " Name", " TD", " Color"] | |
| if not all(col in df_to_save.columns for col in required_cols): | |
| gr.Error( | |
| f"Cannot save. DataFrame missing required script columns. Expected: {required_cols}. Found: {df_to_save.columns.tolist()}" | |
| ) | |
| return None | |
| temp_dir = os.path.join(GRADIO_OUTPUT_BASE_DIR, "_temp_downloads") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| temp_filament_csv_path = os.path.join( | |
| temp_dir, | |
| f"filaments_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| ) | |
| try: | |
| df_to_save.to_csv(temp_filament_csv_path, index=False) | |
| gr.Info("Filaments prepared for download.") | |
| return gr.File( | |
| value=temp_filament_csv_path, | |
| label="Download Filament CSV", | |
| interactive=True, | |
| visible=True, | |
| ) | |
| except Exception as e: | |
| gr.Error(f"Error saving CSV for download: {e}") | |
| return None | |
| filament_table.change( | |
| update_filament_df_state_from_table, | |
| inputs=[filament_table], | |
| outputs=None, | |
| queue=False, | |
| ) | |
| add_filament_button.click( | |
| add_filament_to_table, | |
| inputs=[filament_table, new_brand, new_name, new_td, new_color_hex], | |
| outputs=[filament_table], | |
| ) | |
| load_csv_button.upload( | |
| load_filaments_from_csv_upload, | |
| inputs=[load_csv_button], | |
| outputs=[filament_table], | |
| ) | |
| load_json_button.upload( | |
| load_filaments_from_json_upload, | |
| inputs=[load_json_button], | |
| outputs=[filament_table], | |
| ) | |
| save_csv_button.click( | |
| save_filaments_to_file_for_download, | |
| inputs=[filament_df_state], | |
| outputs=[download_csv_trigger], | |
| ) | |
| with gr.TabItem("Run Autoforge"): | |
| accordion_params_dict = {} | |
| accordion_params_ordered_names = [] | |
| gr.Markdown( | |
| 'Here you can upload an image, adjust the parameters and run the Autoforge process. The filaments from the "Filament Management" Tab are automatically used.' | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Input Image (Required)") | |
| input_image_component = gr.Image( | |
| type="pil", | |
| image_mode="RGBA", | |
| label="Upload Image", | |
| sources=["upload"], | |
| interactive=True, | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Preview") | |
| with gr.Accordion("Progress & Output", open=True): | |
| final_image_preview = gr.Image( | |
| label="Model Preview", | |
| type="filepath", | |
| interactive=False, | |
| ) | |
| with gr.Row(): | |
| with gr.Accordion("Autoforge Parameters", open=False): | |
| args_for_accordion = get_script_args_info( | |
| exclude_args=["--input_image"] | |
| ) | |
| for arg in args_for_accordion: | |
| label, info, default_val = ( | |
| f"{arg['name']}", | |
| arg["help"], | |
| arg.get("default"), | |
| ) | |
| if arg["type"] == "number": | |
| accordion_params_dict[arg["name"]] = gr.Number( | |
| label=label, | |
| value=default_val, | |
| info=info, | |
| minimum=arg.get("min"), | |
| maximum=arg.get("max"), | |
| step=arg.get( | |
| "step", | |
| 0.001 if isinstance(default_val, float) else 1, | |
| ), | |
| precision=arg.get("precision", None), | |
| ) | |
| elif arg["type"] == "slider": | |
| accordion_params_dict[arg["name"]] = gr.Slider( | |
| label=label, | |
| value=default_val, | |
| info=info, | |
| minimum=arg.get("min", 0), | |
| maximum=arg.get("max", 1), | |
| step=arg.get("step", 0.01), | |
| ) | |
| elif arg["type"] == "checkbox": | |
| accordion_params_dict[arg["name"]] = gr.Checkbox( | |
| label=label, value=default_val, info=info | |
| ) | |
| elif arg["type"] == "colorpicker": | |
| accordion_params_dict[arg["name"]] = gr.ColorPicker( | |
| label=label, value=default_val, info=info | |
| ) | |
| else: | |
| accordion_params_dict[arg["name"]] = gr.Textbox( | |
| label=label, value=str(default_val), info=info | |
| ) | |
| accordion_params_ordered_names.append(arg["name"]) | |
| run_button = gr.Button( | |
| "Run Autoforge Process", | |
| variant="primary", | |
| elem_id="run_button_full_width", | |
| ) | |
| progress_output = gr.Textbox( | |
| label="Console Output", | |
| lines=15, | |
| autoscroll=True, | |
| show_copy_button=False, | |
| ) | |
| with gr.Row(): | |
| download_results = gr.File( | |
| label="Download Results (zip)", | |
| file_count="single", | |
| interactive=True, | |
| visible=False, | |
| ) | |
| def execute_autoforge_script( | |
| current_filaments_df_state_val, input_image, *accordion_param_values | |
| ): | |
| log_output = [] | |
| if input_image is None: | |
| gr.Error("Input Image is required! Please upload an image.") | |
| return create_empty_error_outputs("Error: Input Image is required!") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + str(uuid.uuid4()) | |
| run_output_dir_val = os.path.join(GRADIO_OUTPUT_BASE_DIR, f"run_{timestamp}") | |
| os.makedirs(run_output_dir_val, exist_ok=True) | |
| current_run_output_dir.value = run_output_dir_val | |
| if ( | |
| current_filaments_df_state_val is None | |
| or current_filaments_df_state_val.empty | |
| ): | |
| gr.Error("Filament table is empty. Please add filaments.") | |
| return create_empty_error_outputs("Error: Filament table is empty.") | |
| temp_filament_csv = os.path.join(run_output_dir_val, "materials.csv") | |
| df_to_save = current_filaments_df_state_val.copy() | |
| required_cols = ["Brand", " Name", " TD", " Color"] | |
| missing_cols = [col for col in required_cols if col not in df_to_save.columns] | |
| if missing_cols: | |
| err_msg = ( | |
| f"Error: Filament data is missing columns: {', '.join(missing_cols)}." | |
| ) | |
| gr.Error(err_msg) | |
| return create_empty_error_outputs(err_msg) | |
| try: | |
| df_to_save.to_csv(temp_filament_csv, index=False) | |
| except Exception as e: | |
| err_msg = f"Error saving temporary filament CSV: {e}" | |
| gr.Error(err_msg) | |
| return create_empty_error_outputs(err_msg) | |
| command = ["autoforge"] | |
| command.extend(["--csv_file", temp_filament_csv]) | |
| command.extend(["--output_folder", run_output_dir_val]) | |
| command.extend(["--disable_visualization_for_gradio", "1"]) | |
| try: | |
| script_input_image_path = os.path.join( | |
| run_output_dir_val, "input_image.png" | |
| ) | |
| input_image.save(script_input_image_path, format="PNG") | |
| command.extend(["--input_image", script_input_image_path]) | |
| except Exception as e: | |
| err_msg = f"Error handling input image: {e}" | |
| gr.Error(err_msg) | |
| return create_empty_error_outputs(err_msg) | |
| param_dict = dict(zip(accordion_params_ordered_names, accordion_param_values)) | |
| for arg_name, arg_widget_val in param_dict.items(): | |
| if arg_widget_val is None or arg_widget_val == "": | |
| arg_info_list = [ | |
| item for item in get_script_args_info() if item["name"] == arg_name | |
| ] | |
| if ( | |
| arg_info_list | |
| and arg_info_list[0]["type"] == "checkbox" | |
| and arg_widget_val is False | |
| ): | |
| continue | |
| else: | |
| continue | |
| if arg_name == "--background_color": | |
| arg_widget_val = rgba_to_hex(arg_widget_val) | |
| if isinstance(arg_widget_val, bool): | |
| if arg_widget_val: | |
| command.append(arg_name) | |
| else: | |
| command.extend([arg_name, str(arg_widget_val)]) | |
| log_output = [ | |
| "Starting Autoforge process at ", | |
| f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n", | |
| f"Output directory: {run_output_dir_val}\n", | |
| f"Command: {' '.join(command)}\n\n", | |
| ] | |
| yield create_empty_error_outputs("".join(log_output)) | |
| log_file = os.path.join(run_output_dir_val, "autoforge_live.log") | |
| open(log_file, "w", encoding="utf-8").close() | |
| import threading | |
| class Worker(threading.Thread): | |
| def __init__(self, cmd, log_path): | |
| super().__init__(daemon=True) | |
| self.cmd, self.log_path = cmd, log_path | |
| self.returncode = None | |
| self.exc = None | |
| def run(self): | |
| try: | |
| self.returncode = run_autoforge_process(self.cmd, self.log_path) | |
| except Exception as e: | |
| self.exc = e | |
| with open(self.log_path, "a", encoding="utf-8") as lf: | |
| lf.write( | |
| "\nERROR: {}. This usually means there was no GPU or the process took too long.\n".format( | |
| exc_text(e) | |
| ) | |
| ) | |
| self.returncode = -1 | |
| try: | |
| worker = Worker(command, log_file) | |
| worker.start() | |
| preview_mtime = 0 | |
| last_push = 0 | |
| file_pos = 0 | |
| def _maybe_new_preview(): | |
| nonlocal preview_mtime | |
| src = os.path.join(run_output_dir_val, "vis_temp.png") | |
| if not os.path.exists(src): | |
| return gr.update() | |
| mtime = os.path.getmtime(src) | |
| if mtime <= preview_mtime: | |
| return gr.update() | |
| preview_mtime = mtime | |
| return src | |
| while worker.is_alive() or file_pos < os.path.getsize(log_file): | |
| with open(log_file, "r", encoding="utf-8") as lf: | |
| lf.seek(file_pos) | |
| new_txt = lf.read() | |
| file_pos = lf.tell() | |
| log_output.append(new_txt) | |
| now = time.time() | |
| if now - last_push >= 1.0: | |
| current_preview = _maybe_new_preview() | |
| yield ( | |
| "".join(log_output), | |
| current_preview, | |
| gr.update(), | |
| ) | |
| last_push = now | |
| time.sleep(0.05) | |
| worker.join() | |
| except RuntimeError as e: | |
| log_output.append(repr(e)) | |
| gr.Error(str(e)) | |
| with open(log_file, "r", encoding="utf-8") as lf: | |
| lf.seek(file_pos) | |
| new_txt = lf.read() | |
| file_pos = lf.tell() | |
| log_output.append(new_txt) | |
| yield ( | |
| "".join(log_output), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| return create_empty_error_outputs(str(e)) | |
| if getattr(worker, "exc", None) is not None: | |
| err_msg = f"GPU run failed: {worker.exc}" | |
| log_output.append(f"\n{err_msg}\n") | |
| gr.Error(err_msg) | |
| yield ( | |
| "".join(log_output), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| return | |
| with open(log_file, "r", encoding="utf-8") as lf: | |
| lf.seek(file_pos) | |
| log_output.append(lf.read()) | |
| return_code = worker.returncode | |
| files_to_offer = [ | |
| p | |
| for p in [ | |
| os.path.join(run_output_dir_val, "final_model.png"), | |
| os.path.join(run_output_dir_val, "final_model.stl"), | |
| os.path.join(run_output_dir_val, "swap_instructions.txt"), | |
| os.path.join(run_output_dir_val, "project_file.hfp"), | |
| ] | |
| if os.path.exists(p) | |
| ] | |
| png_path = os.path.join(run_output_dir_val, "final_model.png") | |
| out_png = png_path if os.path.exists(png_path) else None | |
| if return_code != 0: | |
| err_msg = ( | |
| f"Autoforge exited with code {return_code}\n" | |
| "See the console output above for details." | |
| ) | |
| log_output.append(f"\n{err_msg}\n") | |
| gr.Error(err_msg) | |
| yield ( | |
| "".join(log_output), | |
| out_png if out_png else gr.update(), | |
| gr.update(), | |
| ) | |
| return | |
| log_output.append("\nAutoforge process completed successfully!") | |
| zip_path = None | |
| if files_to_offer: | |
| zip_path = os.path.join(run_output_dir_val, "autoforge_results.zip") | |
| log_output.append(f"\nZipping results to {os.path.basename(zip_path)}...") | |
| try: | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as zf: | |
| for f in files_to_offer: | |
| zf.write(f, os.path.basename(f)) | |
| log_output.append(" done.") | |
| except Exception as e: | |
| log_output.append(f"\nError creating zip file: {e}") | |
| zip_path = None | |
| if USE_WANDB: | |
| run = None | |
| try: | |
| run = wandb.init( | |
| project="autoforge", | |
| name=f"run_{timestamp}", | |
| notes="Autoforge Web UI run", | |
| tags=["autoforge", "gradio"], | |
| ) | |
| wlogs = {"input_image": wandb.Image(script_input_image_path)} | |
| if out_png: | |
| wlogs["output_image"] = wandb.Image(out_png) | |
| material_csv = pd.read_csv(temp_filament_csv) | |
| table = wandb.Table(dataframe=material_csv) | |
| wlogs["materials"] = table | |
| from wandb import Html | |
| log_text = "".join(log_output).replace("\r", "\n") | |
| def clean_log_strict(text: str) -> str: | |
| allowed = set(string.printable) | {"\n", "\t"} | |
| return "".join(ch for ch in text if ch in allowed) | |
| log_text_cleaned = clean_log_strict(log_text) | |
| wlogs["log"] = Html(f"<pre>{log_text_cleaned}</pre>") | |
| wandb.log(wlogs) | |
| except Exception as e: | |
| print(e) | |
| finally: | |
| if run is not None: | |
| run.finish() | |
| yield ( | |
| "".join(log_output), | |
| out_png, | |
| gr.update( | |
| value=zip_path, | |
| visible=bool(zip_path), | |
| interactive=bool(zip_path), | |
| ), | |
| ) | |
| run_inputs = [filament_df_state, input_image_component] + [ | |
| accordion_params_dict[name] for name in accordion_params_ordered_names | |
| ] | |
| run_outputs = [ | |
| progress_output, | |
| final_image_preview, | |
| download_results, | |
| ] | |
| run_button.click(execute_autoforge_script, inputs=run_inputs, outputs=run_outputs) | |
| css = """ #run_button_full_width { width: 100%; } """ | |
| if __name__ == "__main__": | |
| if not os.path.exists(DEFAULT_MATERIALS_CSV): | |
| print(f"Creating default filament file: {DEFAULT_MATERIALS_CSV}") | |
| try: | |
| initial_df.to_csv(DEFAULT_MATERIALS_CSV, index=False) | |
| except Exception as e: | |
| print(f"Could not write default {DEFAULT_MATERIALS_CSV}: {e}") | |
| print("To run the UI, execute: python app.py") | |
| demo.queue(default_concurrency_limit=1).launch(share=False) | |