Autoforge / app.py
hvoss-techfak's picture
reverted
74517ab
import json
import string
import uuid
import os
import logging
import zipfile
import importlib
import wandb
from contextlib import redirect_stdout, redirect_stderr
import spaces
USE_WANDB = "WANDB_API_KEY" in os.environ
if USE_WANDB:
wandb.login(key=os.environ["WANDB_API_KEY"])
else:
print("Warning: WANDB_API_KEY not set. Skipping wandb logging.")
import gradio as gr
import pandas as pd
import time
import sys
from datetime import datetime
import re
# --- Configuration ---
DEFAULT_MATERIALS_CSV = "default_materials.csv"
GRADIO_OUTPUT_BASE_DIR = "output"
os.makedirs(GRADIO_OUTPUT_BASE_DIR, exist_ok=True)
REQUIRED_SCRIPT_COLS = ["Brand", " Name", " TD", " Color"]
DISPLAY_COL_MAP = {
"Brand": "Brand",
" Name": "Name",
" TD": "TD",
" Color": "Color (Hex)",
}
def exc_text(exc: BaseException) -> str:
txt = str(exc).strip()
if txt:
return txt
if exc.args:
return " ".join(str(a) for a in exc.args).strip()
return exc.__class__.__name__
def ensure_required_cols(df, *, in_display_space):
target_cols = (
DISPLAY_COL_MAP if in_display_space else {k: k for k in REQUIRED_SCRIPT_COLS}
)
df_fixed = df.copy()
for col_script, col_display in target_cols.items():
if col_display not in df_fixed.columns:
if "TD" in col_display:
default = 0.0
elif "Color" in col_display:
default = "#000000"
elif "Owned" in col_display:
default = "false"
else:
default = ""
df_fixed[col_display] = default
return df_fixed[list(target_cols.values())]
def rgba_to_hex(col: str) -> str:
if not isinstance(col, str):
return col
col = col.strip()
if col.startswith("#"):
return col.upper()
m = re.match(
r"rgba?\(\s*([\d.]+)\s*,\s*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*[\d.]+)?\s*\)",
col,
)
if not m:
return col
r, g, b = (int(float(x)) for x in m.groups()[:3])
return "#{:02X}{:02X}{:02X}".format(r, g, b)
def zip_dir_no_compress(src_dir: str, dest_zip: str) -> str:
t0 = time.time()
with zipfile.ZipFile(dest_zip, "w",
compression=zipfile.ZIP_STORED,
allowZip64=True) as zf:
for root, _, files in os.walk(src_dir):
for fname in files:
fpath = os.path.join(root, fname)
zf.write(fpath, os.path.relpath(fpath, src_dir))
print(f"Zipping finished in {time.time() - t0:.1f}s")
return dest_zip
def get_script_args_info(exclude_args=None):
if exclude_args is None:
exclude_args = []
all_args_info = [
{
"name": "--iterations",
"type": "number",
"default": 4000,
"help": "Number of optimization iterations",
},
{
"name": "--layer_height",
"type": "number",
"default": 0.04,
"step": 0.01,
"help": "Layer thickness in mm",
},
{
"name": "--max_layers",
"type": "number",
"default": 75,
"precision": 0,
"help": "Maximum number of layers",
},
{
"name": "--learning_rate",
"type": "number",
"default": 0.015,
"step": 0.001,
"help": "Learning rate for optimization",
},
{
"name": "--background_height",
"type": "number",
"default": 0.4,
"step": 0.01,
"help": "Height of the background in mm",
},
{
"name": "--background_color",
"type": "colorpicker",
"default": "#000000",
"help": "Background color",
},
{
"name": "--stl_output_size",
"type": "number",
"default": 100,
"precision": 0,
"help": "Size of the longest dimension of the output STL file in mm",
},
{
"name": "--nozzle_diameter",
"type": "number",
"default": 0.4,
"step": 0.1,
"help": "Diameter of the printer nozzle in mm",
},
{
"name": "--pruning_max_colors",
"type": "number",
"default": 100,
"precision": 0,
"help": "Max number of colors allowed after pruning",
},
{
"name": "--pruning_max_swaps",
"type": "number",
"default": 50,
"precision": 0,
"help": "Max number of swaps allowed after pruning",
},
{
"name": "--pruning_max_layer",
"type": "number",
"default": 75,
"precision": 0,
"help": "Max number of layers allowed after pruning",
},
{
"name": "--warmup_fraction",
"type": "slider",
"default": 1.0,
"min": 0.0,
"max": 1.0,
"step": 0.01,
"help": "Fraction of iterations for keeping the tau at the initial value",
},
{
"name": "--learning_rate_warmup_fraction",
"type": "slider",
"default": 0.01,
"min": 0.0,
"max": 1.0,
"step": 0.01,
"help": "Fraction of iterations that the learning rate is increasing (warmup)",
},
{
"name": "--early_stopping",
"type": "number",
"default": 5000,
"precision": 0,
"help": "Number of steps without improvement before stopping",
},
{
"name": "--fast_pruning_percent",
"type": "slider",
"default": 0.05,
"min": 0.0,
"max": 1.0,
"step": 0.01,
"help": "Percentage of increment search for fast pruning.",
},
{
"name": "--random_seed",
"type": "number",
"default": 0,
"precision": 0,
"help": "Specify the random seed, or use 0 for automatic generation",
},
{
"name": "--num_init_rounds",
"type": "number",
"default": 8,
"precision": 0,
"help": "Number of rounds to choose the starting height map from.",
},
]
return [arg for arg in all_args_info if arg["name"] not in exclude_args]
# initial data that will be used if no CSV exists
initial_filament_data = {
"Brand": ["Generic", "Generic", "Generic", "Generic", "Generic", "Generic"],
" Name": ["PLA Black", "PLA Grey", "PLA White", "PLA Red", "PLA Green", "PLA Blue"],
" TD": [5.0, 5.0, 5.0, 5.0, 5.0],
" Color": ["#000000", "#808080", "#FFFFFF", "#FF0000", "#00FF00", "#0000FF"],
" Owned": ["true", "true", "true", "true", "true", "true"],
}
def normalize_filament_df(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df.columns = [c.strip() for c in df.columns]
rename_map = {
"Name": " Name",
"TD": " TD",
"Color": " Color",
"Owned": " Owned",
}
for src, dst in rename_map.items():
if src in df.columns and dst not in df.columns:
df.rename(columns={src: dst}, inplace=True)
if " TD" in df.columns:
df[" TD"] = pd.to_numeric(df[" TD"], errors="coerce").fillna(0.0)
else:
df[" TD"] = 0.0
if " Color" in df.columns:
df[" Color"] = df[" Color"].astype(str)
else:
df[" Color"] = "#000000"
if " Owned" not in df.columns:
df[" Owned"] = "false"
else:
df[" Owned"] = df[" Owned"].astype(str)
if "Brand" not in df.columns:
df["Brand"] = ""
ordered_cols = ["Brand", " Name", " TD", " Color", " Owned"]
df = df[[c for c in ordered_cols if c in df.columns]]
return df
# load CSV if present
if os.path.exists(DEFAULT_MATERIALS_CSV):
try:
loaded_df = pd.read_csv(DEFAULT_MATERIALS_CSV, index_col=False)
loaded_df = normalize_filament_df(loaded_df)
initial_df = loaded_df.copy()
initial_filament_data = {
"Brand": initial_df["Brand"].tolist(),
" Name": initial_df[" Name"].tolist(),
" TD": initial_df[" TD"].tolist(),
" Color": initial_df[" Color"].tolist(),
}
if " Owned" in initial_df.columns:
initial_filament_data[" Owned"] = initial_df[" Owned"].astype(str).tolist()
else:
initial_filament_data[" Owned"] = ["false"] * len(initial_df)
except Exception as e:
print(f"Warning: Could not load {DEFAULT_MATERIALS_CSV}: {e}. Using default.")
initial_df = pd.DataFrame(initial_filament_data)
else:
initial_df = pd.DataFrame(initial_filament_data)
initial_df.to_csv(DEFAULT_MATERIALS_CSV, index=False)
def run_autoforge_process(cmd, log_path):
from joblib import parallel_backend
cli_args = cmd[1:]
autoforge_main = importlib.import_module("autoforge.__main__")
exit_code = 0
with open(log_path, "w", buffering=1, encoding="utf-8") as log_f, \
redirect_stdout(log_f), redirect_stderr(log_f), parallel_backend("threading", n_jobs=-1):
try:
sys.argv = ["autoforge"] + cli_args
autoforge_main.main()
except SystemExit as e:
exit_code = e.code
except Exception as e:
log_f.write(f"\nERROR: {e}\n")
exit_code = -1
return exit_code
def create_empty_error_outputs(log_message=""):
return (
log_message,
None,
gr.update(visible=False, interactive=False),
)
def load_filaments_from_json_upload(file_obj):
if file_obj is None:
current_script_df = filament_df_state.value
if current_script_df is not None and not current_script_df.empty:
return current_script_df.rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
return initial_df.copy().rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
try:
with open(file_obj.name, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict) and "Filaments" in data:
data = data["Filaments"]
df_loaded = pd.DataFrame(data)
df_loaded.columns = [c.strip() for c in df_loaded.columns]
rename_map = {
"Name": " Name",
"Transmissivity": " TD",
"Color": " Color",
"Owned": " Owned",
}
df_loaded.rename(
columns={k: v for k, v in rename_map.items() if k in df_loaded.columns},
inplace=True,
)
df_loaded = normalize_filament_df(df_loaded)
filament_df_state.value = df_loaded.copy()
return df_loaded.rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
except Exception as e:
gr.Error(f"Error loading JSON: {e}")
return filament_table.value
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# [Autoforge](https://github.com/hvoss-techfak/AutoForge) Web UI")
filament_df_state = gr.State(initial_df.copy())
current_run_output_dir = gr.State(None)
with gr.Tabs():
with gr.TabItem("Filament Management"):
gr.Markdown(
'Manage your filament list here. This list will be used by Autoforge during the optimization process.'
)
gr.Markdown(
'If you have Hueforge, you can export your filaments under "Filaments -> Export" in the Hueforge software. Please make sure to select "CSV" instead of "JSON" during the export dialog.'
)
gr.Markdown(
'If you want to load your personal library of Hueforge filaments, you can also simply paste this path into your explorer address bar: %APPDATA%\\HueForge\\Filaments\\ and import your "personal_library.json" using the "Load Filaments Json" button.'
)
gr.Markdown(
'To remove a filament simply right-click on any of the fields and select "Delete Row"'
)
gr.Markdown(
'Hint: If you have an AMS 3d printer try giving it your entire filament library and then set "pruning_max_colors" under "Autoforge Parameters" in the second tab to your number of AMS slots.'
' Autoforge will automatically select the best matching colors for your image.'
)
with gr.Row():
load_csv_button = gr.UploadButton(
"Load Filaments CSV", file_types=[".csv"]
)
load_json_button = gr.UploadButton(
"Load Filaments JSON", file_types=[".json"]
)
save_csv_button = gr.Button("Save Current Filaments to CSV")
filament_table = gr.DataFrame(
value=ensure_required_cols(
initial_df.copy().rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
),
in_display_space=True,
),
headers=["Brand", "Name", "TD", "Color (Hex)"],
datatype=["str", "str", "number", "str"],
interactive=True,
label="Filaments",
)
gr.Markdown("## Add New Filament")
with gr.Row():
new_brand = gr.Textbox(label="Brand")
new_name = gr.Textbox(label="Name")
with gr.Row():
new_td = gr.Number(
label="TD (Transmission/Opacity)",
value=1.0,
minimum=0,
maximum=100,
step=0.1,
)
new_color_hex = gr.ColorPicker(label="Color", value="#FF0000")
add_filament_button = gr.Button("Add Filament to Table")
download_csv_trigger = gr.File(
label="Download Filament CSV", visible=False, interactive=False
)
def update_filament_df_state_from_table(display_df):
display_df = ensure_required_cols(display_df, in_display_space=True)
if "Color (Hex)" in display_df.columns:
display_df["Color (Hex)"] = display_df["Color (Hex)"].apply(
rgba_to_hex
)
script_df = display_df.rename(
columns={"Name": " Name", "TD": " TD", "Color (Hex)": " Color"}
)
script_df = ensure_required_cols(script_df, in_display_space=False)
filament_df_state.value = script_df
def add_filament_to_table(current_display_df, brand, name, td, color_hex):
if not brand or not name:
gr.Warning("Brand and Name cannot be empty.")
return current_display_df
color_hex = rgba_to_hex(color_hex)
new_row = pd.DataFrame(
[{"Brand": brand, "Name": name, "TD": td, "Color (Hex)": color_hex}]
)
updated_display_df = pd.concat(
[current_display_df, new_row], ignore_index=True
)
update_filament_df_state_from_table(updated_display_df)
return updated_display_df
def load_filaments_from_csv_upload(file_obj):
if file_obj is None:
current_script_df = filament_df_state.value
if current_script_df is not None and not current_script_df.empty:
return current_script_df.rename(
columns={
" Name": "Name",
" TD": "TD",
" Color": "Color (Hex)",
}
)
return initial_df.copy().rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
try:
loaded_script_df = pd.read_csv(file_obj.name, index_col=False)
loaded_script_df = normalize_filament_df(loaded_script_df)
expected_cols = ["Brand", " Name", " TD", " Color"]
if not all(col in loaded_script_df.columns for col in expected_cols):
gr.Error(
f"CSV must contain columns: {', '.join(expected_cols)}. Found: {loaded_script_df.columns.tolist()}"
)
current_script_df = filament_df_state.value
if (
current_script_df is not None
and not current_script_df.empty
):
return current_script_df.rename(
columns={
" Name": "Name",
" TD": "TD",
" Color": "Color (Hex)",
}
)
return initial_df.copy().rename(
columns={
" Name": "Name",
" TD": "TD",
" Color": "Color (Hex)",
}
)
filament_df_state.value = loaded_script_df.copy()
return loaded_script_df.rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
except Exception as e:
gr.Error(f"Error loading CSV: {e}")
current_script_df = filament_df_state.value
if current_script_df is not None and not current_script_df.empty:
return current_script_df.rename(
columns={
" Name": "Name",
" TD": "TD",
" Color": "Color (Hex)",
}
)
return initial_df.copy().rename(
columns={" Name": "Name", " TD": "TD", " Color": "Color (Hex)"}
)
def save_filaments_to_file_for_download(current_script_df_from_state):
if (
current_script_df_from_state is None
or current_script_df_from_state.empty
):
gr.Warning("Filament table is empty. Nothing to save.")
return None
df_to_save = current_script_df_from_state.copy()
required_cols = ["Brand", " Name", " TD", " Color"]
if not all(col in df_to_save.columns for col in required_cols):
gr.Error(
f"Cannot save. DataFrame missing required script columns. Expected: {required_cols}. Found: {df_to_save.columns.tolist()}"
)
return None
temp_dir = os.path.join(GRADIO_OUTPUT_BASE_DIR, "_temp_downloads")
os.makedirs(temp_dir, exist_ok=True)
temp_filament_csv_path = os.path.join(
temp_dir,
f"filaments_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
)
try:
df_to_save.to_csv(temp_filament_csv_path, index=False)
gr.Info("Filaments prepared for download.")
return gr.File(
value=temp_filament_csv_path,
label="Download Filament CSV",
interactive=True,
visible=True,
)
except Exception as e:
gr.Error(f"Error saving CSV for download: {e}")
return None
filament_table.change(
update_filament_df_state_from_table,
inputs=[filament_table],
outputs=None,
queue=False,
)
add_filament_button.click(
add_filament_to_table,
inputs=[filament_table, new_brand, new_name, new_td, new_color_hex],
outputs=[filament_table],
)
load_csv_button.upload(
load_filaments_from_csv_upload,
inputs=[load_csv_button],
outputs=[filament_table],
)
load_json_button.upload(
load_filaments_from_json_upload,
inputs=[load_json_button],
outputs=[filament_table],
)
save_csv_button.click(
save_filaments_to_file_for_download,
inputs=[filament_df_state],
outputs=[download_csv_trigger],
)
with gr.TabItem("Run Autoforge"):
accordion_params_dict = {}
accordion_params_ordered_names = []
gr.Markdown(
'Here you can upload an image, adjust the parameters and run the Autoforge process. The filaments from the "Filament Management" Tab are automatically used.'
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Input Image (Required)")
input_image_component = gr.Image(
type="pil",
image_mode="RGBA",
label="Upload Image",
sources=["upload"],
interactive=True,
)
with gr.Column(scale=2):
gr.Markdown("### Preview")
with gr.Accordion("Progress & Output", open=True):
final_image_preview = gr.Image(
label="Model Preview",
type="filepath",
interactive=False,
)
with gr.Row():
with gr.Accordion("Autoforge Parameters", open=False):
args_for_accordion = get_script_args_info(
exclude_args=["--input_image"]
)
for arg in args_for_accordion:
label, info, default_val = (
f"{arg['name']}",
arg["help"],
arg.get("default"),
)
if arg["type"] == "number":
accordion_params_dict[arg["name"]] = gr.Number(
label=label,
value=default_val,
info=info,
minimum=arg.get("min"),
maximum=arg.get("max"),
step=arg.get(
"step",
0.001 if isinstance(default_val, float) else 1,
),
precision=arg.get("precision", None),
)
elif arg["type"] == "slider":
accordion_params_dict[arg["name"]] = gr.Slider(
label=label,
value=default_val,
info=info,
minimum=arg.get("min", 0),
maximum=arg.get("max", 1),
step=arg.get("step", 0.01),
)
elif arg["type"] == "checkbox":
accordion_params_dict[arg["name"]] = gr.Checkbox(
label=label, value=default_val, info=info
)
elif arg["type"] == "colorpicker":
accordion_params_dict[arg["name"]] = gr.ColorPicker(
label=label, value=default_val, info=info
)
else:
accordion_params_dict[arg["name"]] = gr.Textbox(
label=label, value=str(default_val), info=info
)
accordion_params_ordered_names.append(arg["name"])
run_button = gr.Button(
"Run Autoforge Process",
variant="primary",
elem_id="run_button_full_width",
)
progress_output = gr.Textbox(
label="Console Output",
lines=15,
autoscroll=True,
show_copy_button=False,
)
with gr.Row():
download_results = gr.File(
label="Download Results (zip)",
file_count="single",
interactive=True,
visible=False,
)
@spaces.GPU(duration=150)
def execute_autoforge_script(
current_filaments_df_state_val, input_image, *accordion_param_values
):
log_output = []
if input_image is None:
gr.Error("Input Image is required! Please upload an image.")
return create_empty_error_outputs("Error: Input Image is required!")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + str(uuid.uuid4())
run_output_dir_val = os.path.join(GRADIO_OUTPUT_BASE_DIR, f"run_{timestamp}")
os.makedirs(run_output_dir_val, exist_ok=True)
current_run_output_dir.value = run_output_dir_val
if (
current_filaments_df_state_val is None
or current_filaments_df_state_val.empty
):
gr.Error("Filament table is empty. Please add filaments.")
return create_empty_error_outputs("Error: Filament table is empty.")
temp_filament_csv = os.path.join(run_output_dir_val, "materials.csv")
df_to_save = current_filaments_df_state_val.copy()
required_cols = ["Brand", " Name", " TD", " Color"]
missing_cols = [col for col in required_cols if col not in df_to_save.columns]
if missing_cols:
err_msg = (
f"Error: Filament data is missing columns: {', '.join(missing_cols)}."
)
gr.Error(err_msg)
return create_empty_error_outputs(err_msg)
try:
df_to_save.to_csv(temp_filament_csv, index=False)
except Exception as e:
err_msg = f"Error saving temporary filament CSV: {e}"
gr.Error(err_msg)
return create_empty_error_outputs(err_msg)
command = ["autoforge"]
command.extend(["--csv_file", temp_filament_csv])
command.extend(["--output_folder", run_output_dir_val])
command.extend(["--disable_visualization_for_gradio", "1"])
try:
script_input_image_path = os.path.join(
run_output_dir_val, "input_image.png"
)
input_image.save(script_input_image_path, format="PNG")
command.extend(["--input_image", script_input_image_path])
except Exception as e:
err_msg = f"Error handling input image: {e}"
gr.Error(err_msg)
return create_empty_error_outputs(err_msg)
param_dict = dict(zip(accordion_params_ordered_names, accordion_param_values))
for arg_name, arg_widget_val in param_dict.items():
if arg_widget_val is None or arg_widget_val == "":
arg_info_list = [
item for item in get_script_args_info() if item["name"] == arg_name
]
if (
arg_info_list
and arg_info_list[0]["type"] == "checkbox"
and arg_widget_val is False
):
continue
else:
continue
if arg_name == "--background_color":
arg_widget_val = rgba_to_hex(arg_widget_val)
if isinstance(arg_widget_val, bool):
if arg_widget_val:
command.append(arg_name)
else:
command.extend([arg_name, str(arg_widget_val)])
log_output = [
"Starting Autoforge process at ",
f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
f"Output directory: {run_output_dir_val}\n",
f"Command: {' '.join(command)}\n\n",
]
yield create_empty_error_outputs("".join(log_output))
log_file = os.path.join(run_output_dir_val, "autoforge_live.log")
open(log_file, "w", encoding="utf-8").close()
import threading
class Worker(threading.Thread):
def __init__(self, cmd, log_path):
super().__init__(daemon=True)
self.cmd, self.log_path = cmd, log_path
self.returncode = None
self.exc = None
def run(self):
try:
self.returncode = run_autoforge_process(self.cmd, self.log_path)
except Exception as e:
self.exc = e
with open(self.log_path, "a", encoding="utf-8") as lf:
lf.write(
"\nERROR: {}. This usually means there was no GPU or the process took too long.\n".format(
exc_text(e)
)
)
self.returncode = -1
try:
worker = Worker(command, log_file)
worker.start()
preview_mtime = 0
last_push = 0
file_pos = 0
def _maybe_new_preview():
nonlocal preview_mtime
src = os.path.join(run_output_dir_val, "vis_temp.png")
if not os.path.exists(src):
return gr.update()
mtime = os.path.getmtime(src)
if mtime <= preview_mtime:
return gr.update()
preview_mtime = mtime
return src
while worker.is_alive() or file_pos < os.path.getsize(log_file):
with open(log_file, "r", encoding="utf-8") as lf:
lf.seek(file_pos)
new_txt = lf.read()
file_pos = lf.tell()
log_output.append(new_txt)
now = time.time()
if now - last_push >= 1.0:
current_preview = _maybe_new_preview()
yield (
"".join(log_output),
current_preview,
gr.update(),
)
last_push = now
time.sleep(0.05)
worker.join()
except RuntimeError as e:
log_output.append(repr(e))
gr.Error(str(e))
with open(log_file, "r", encoding="utf-8") as lf:
lf.seek(file_pos)
new_txt = lf.read()
file_pos = lf.tell()
log_output.append(new_txt)
yield (
"".join(log_output),
gr.update(),
gr.update(),
)
return create_empty_error_outputs(str(e))
if getattr(worker, "exc", None) is not None:
err_msg = f"GPU run failed: {worker.exc}"
log_output.append(f"\n{err_msg}\n")
gr.Error(err_msg)
yield (
"".join(log_output),
gr.update(),
gr.update(),
)
return
with open(log_file, "r", encoding="utf-8") as lf:
lf.seek(file_pos)
log_output.append(lf.read())
return_code = worker.returncode
files_to_offer = [
p
for p in [
os.path.join(run_output_dir_val, "final_model.png"),
os.path.join(run_output_dir_val, "final_model.stl"),
os.path.join(run_output_dir_val, "swap_instructions.txt"),
os.path.join(run_output_dir_val, "project_file.hfp"),
]
if os.path.exists(p)
]
png_path = os.path.join(run_output_dir_val, "final_model.png")
out_png = png_path if os.path.exists(png_path) else None
if return_code != 0:
err_msg = (
f"Autoforge exited with code {return_code}\n"
"See the console output above for details."
)
log_output.append(f"\n{err_msg}\n")
gr.Error(err_msg)
yield (
"".join(log_output),
out_png if out_png else gr.update(),
gr.update(),
)
return
log_output.append("\nAutoforge process completed successfully!")
zip_path = None
if files_to_offer:
zip_path = os.path.join(run_output_dir_val, "autoforge_results.zip")
log_output.append(f"\nZipping results to {os.path.basename(zip_path)}...")
try:
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_STORED) as zf:
for f in files_to_offer:
zf.write(f, os.path.basename(f))
log_output.append(" done.")
except Exception as e:
log_output.append(f"\nError creating zip file: {e}")
zip_path = None
if USE_WANDB:
run = None
try:
run = wandb.init(
project="autoforge",
name=f"run_{timestamp}",
notes="Autoforge Web UI run",
tags=["autoforge", "gradio"],
)
wlogs = {"input_image": wandb.Image(script_input_image_path)}
if out_png:
wlogs["output_image"] = wandb.Image(out_png)
material_csv = pd.read_csv(temp_filament_csv)
table = wandb.Table(dataframe=material_csv)
wlogs["materials"] = table
from wandb import Html
log_text = "".join(log_output).replace("\r", "\n")
def clean_log_strict(text: str) -> str:
allowed = set(string.printable) | {"\n", "\t"}
return "".join(ch for ch in text if ch in allowed)
log_text_cleaned = clean_log_strict(log_text)
wlogs["log"] = Html(f"<pre>{log_text_cleaned}</pre>")
wandb.log(wlogs)
except Exception as e:
print(e)
finally:
if run is not None:
run.finish()
yield (
"".join(log_output),
out_png,
gr.update(
value=zip_path,
visible=bool(zip_path),
interactive=bool(zip_path),
),
)
run_inputs = [filament_df_state, input_image_component] + [
accordion_params_dict[name] for name in accordion_params_ordered_names
]
run_outputs = [
progress_output,
final_image_preview,
download_results,
]
run_button.click(execute_autoforge_script, inputs=run_inputs, outputs=run_outputs)
css = """ #run_button_full_width { width: 100%; } """
if __name__ == "__main__":
if not os.path.exists(DEFAULT_MATERIALS_CSV):
print(f"Creating default filament file: {DEFAULT_MATERIALS_CSV}")
try:
initial_df.to_csv(DEFAULT_MATERIALS_CSV, index=False)
except Exception as e:
print(f"Could not write default {DEFAULT_MATERIALS_CSV}: {e}")
print("To run the UI, execute: python app.py")
demo.queue(default_concurrency_limit=1).launch(share=False)