From f95770720f6541a3f47e68fbe4dbcdc4360fa9e1 Mon Sep 17 00:00:00 2001 From: Kelsi Date: Mon, 23 Feb 2026 20:06:41 -0800 Subject: [PATCH] Fix security, bugs, and UX in asset pipeline GUI - Fix zip slip vulnerability: validate extracted paths stay within target - Fix redundant mkdir before rmtree in rebuild_override() - Add build/asset_extract and Windows .ps1 fallback to extractor search - Preserve pack list selection across refreshes - Add Cancel Extraction button with process.terminate() - Run override rebuild in background thread to avoid UI freeze - Fix locale combobox state to readonly - Add asset_pipeline/ to .gitignore - Make script executable --- .gitignore | 3 ++ tools/asset_pipeline_gui.py | 67 +++++++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 18 deletions(-) mode change 100644 => 100755 tools/asset_pipeline_gui.py diff --git a/.gitignore b/.gitignore index 101f6f98..f95c18ed 100644 --- a/.gitignore +++ b/.gitignore @@ -89,6 +89,9 @@ Data/expansions/*/overlay/ Data/hd/ ingest/ +# Asset pipeline state and texture packs +asset_pipeline/ + # Local texture dumps / extracted art should never be committed assets/textures/ node_modules/ diff --git a/tools/asset_pipeline_gui.py b/tools/asset_pipeline_gui.py old mode 100644 new mode 100755 index 194ff7bd..a3a3c3f7 --- a/tools/asset_pipeline_gui.py +++ b/tools/asset_pipeline_gui.py @@ -137,7 +137,11 @@ class PipelineManager: target.mkdir(parents=True, exist_ok=False) with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(target) + for member in zf.infolist(): + member_path = (target / member.filename).resolve() + if not str(member_path).startswith(str(target.resolve()) + "/") and member_path != target.resolve(): + raise ValueError(f"Zip slip detected: {member.filename!r} escapes target directory") + zf.extract(member, target) data_root = self.find_data_root(target) info = PackInfo( @@ -202,8 +206,6 @@ class PipelineManager: def rebuild_override(self) -> dict[str, int]: out_dir = Path(self.state.output_data_dir) override_dir = out_dir / "override" - override_dir.mkdir(parents=True, exist_ok=True) - if override_dir.exists(): shutil.rmtree(override_dir) override_dir.mkdir(parents=True, exist_ok=True) @@ -243,15 +245,20 @@ class PipelineManager: if path.exists() and path.is_file(): return [str(path)] - ext = ".exe" if platform.system().lower().startswith("win") else "" + is_win = platform.system().lower().startswith("win") + ext = ".exe" if is_win else "" for candidate in [ ROOT_DIR / "build" / "bin" / f"asset_extract{ext}", + ROOT_DIR / "build" / f"asset_extract{ext}", ROOT_DIR / "bin" / f"asset_extract{ext}", ]: if candidate.exists(): return [str(candidate)] - if platform.system().lower().startswith("win"): + if is_win: + ps_script = ROOT_DIR / "extract_assets.ps1" + if ps_script.exists(): + return ["powershell", "-ExecutionPolicy", "Bypass", "-File", str(ps_script)] return None shell_script = ROOT_DIR / "extract_assets.sh" @@ -333,6 +340,7 @@ class AssetPipelineGUI: self.log_queue: queue.Queue[str] = queue.Queue() self.proc_thread: threading.Thread | None = None + self.proc_process: subprocess.Popen | None = None self.proc_running = False self.root.title("WoWee Asset Pipeline") @@ -403,7 +411,7 @@ class AssetPipelineGUI: frame, textvariable=self.var_locale, values=["auto", "enUS", "enGB", "deDE", "frFR", "esES", "esMX", "ruRU", "koKR", "zhCN", "zhTW"], - state="normal", + state="readonly", width=12, ) loc_combo.grid(row=3, column=3, sticky="w", pady=6) @@ -424,6 +432,8 @@ class AssetPipelineGUI: buttons.grid(row=6, column=0, columnspan=4, sticky="w", pady=12) ttk.Button(buttons, text="Save Configuration", command=self.save_config).pack(side="left", padx=(0, 8)) ttk.Button(buttons, text="Run Extraction", command=self.run_extraction).pack(side="left", padx=(0, 8)) + self.cancel_btn = ttk.Button(buttons, text="Cancel Extraction", command=self.cancel_extraction, state="disabled") + self.cancel_btn.pack(side="left", padx=(0, 8)) ttk.Button(buttons, text="Refresh State", command=self.refresh_state_view).pack(side="left") tip = ( @@ -533,6 +543,7 @@ class AssetPipelineGUI: return self.manager.state.packs[idx] def refresh_pack_list(self) -> None: + prev_sel = self.pack_list.curselection() active = self.manager.state.active_pack_ids self.pack_list.delete(0, tk.END) for pack in self.manager.state.packs: @@ -540,6 +551,11 @@ class AssetPipelineGUI: if pack.pack_id in active: marker = f"[active #{active.index(pack.pack_id) + 1}] " self.pack_list.insert(tk.END, f"{marker}{pack.name}") + # Restore previous selection if still valid. + for idx in prev_sel: + if 0 <= idx < self.pack_list.size(): + self.pack_list.selection_set(idx) + self.pack_list.see(idx) self._refresh_pack_detail() def _refresh_pack_detail(self) -> None: @@ -638,18 +654,22 @@ class AssetPipelineGUI: self.status_var.set(f"Uninstalled pack: {pack.name}") def rebuild_override(self) -> None: - try: - report = self.manager.rebuild_override() - except Exception as exc: # pylint: disable=broad-except - messagebox.showerror("Override rebuild failed", str(exc)) - return - self.refresh_state_view() - self.status_var.set( - f"Override rebuilt: {report['copied']} files copied, {report['replaced']} replaced" - ) - self._append_log( - f"[{self.manager.now_str()}] Override rebuild complete: {report['copied']} copied, {report['replaced']} replaced" - ) + self.status_var.set("Rebuilding override...") + self.log_queue.put(f"[{self.manager.now_str()}] Starting override rebuild...") + + def worker() -> None: + try: + report = self.manager.rebuild_override() + msg = f"Override rebuilt: {report['copied']} files copied, {report['replaced']} replaced" + self.log_queue.put(f"[{self.manager.now_str()}] Override rebuild complete: {report['copied']} copied, {report['replaced']} replaced") + self.root.after(0, lambda: self.status_var.set(msg)) + except Exception as exc: # pylint: disable=broad-except + self.log_queue.put(f"[{self.manager.now_str()}] Override rebuild failed: {exc}") + self.root.after(0, lambda: self.status_var.set("Override rebuild failed")) + finally: + self.root.after(0, self.refresh_state_view) + + threading.Thread(target=worker, daemon=True).start() def clear_logs(self) -> None: self.log_text.configure(state="normal") @@ -671,6 +691,12 @@ class AssetPipelineGUI: self._append_log(line) self.root.after(120, self._poll_logs) + def cancel_extraction(self) -> None: + if self.proc_process is not None: + self.proc_process.terminate() + self.log_queue.put(f"[{self.manager.now_str()}] Extraction cancelled by user") + self.status_var.set("Extraction cancelled") + def run_extraction(self) -> None: if self.proc_running: messagebox.showinfo("Extraction running", "An extraction is already running.") @@ -684,6 +710,8 @@ class AssetPipelineGUI: messagebox.showerror("Cannot run extraction", str(exc)) return + self.cancel_btn.configure(state="normal") + def worker() -> None: self.proc_running = True started = self.manager.now_str() @@ -700,6 +728,7 @@ class AssetPipelineGUI: text=True, bufsize=1, ) + self.proc_process = process assert process.stdout is not None for line in process.stdout: self.log_queue.put(line.rstrip()) @@ -710,12 +739,14 @@ class AssetPipelineGUI: except Exception as exc: # pylint: disable=broad-except self.log_queue.put(f"Extraction error: {exc}") finally: + self.proc_process = None self.manager.state.last_extract_at = self.manager.now_str() self.manager.state.last_extract_ok = ok self.manager.state.last_extract_command = " ".join(cmd) self.manager.save_state() self.proc_running = False self.root.after(0, self.refresh_state_view) + self.root.after(0, lambda: self.cancel_btn.configure(state="disabled")) self.root.after( 0, lambda: self.status_var.set("Extraction complete" if ok else "Extraction failed") )