import os import sys import shutil import argparse from concurrent.futures import ThreadPoolExecutor, as_completed def extract_version_model(filename: str): """Extract version and model from a filename like: cjsyun-1.21.1_Fabric_xxx.zip -> ("1.21.1", "fabric") Returns None if the filename does not match expected pattern. """ base = os.path.splitext(os.path.basename(filename))[0] if not base.startswith("cjsyun-"): return None rest = base[len("cjsyun-"):] parts = rest.split("_") if len(parts) < 2: return None version_raw = parts[0].strip() model_raw = parts[1].strip() # Keep alphanumeric and dashes in model; lowercase for normalization model = "".join(ch for ch in model_raw if ch.isalnum() or ch == "-").lower() version = version_raw if not version or not model: return None return version, model def compute_unique_path(dest_dir: str, filename: str) -> str: """Return a unique destination path inside dest_dir for filename.""" base_path = os.path.join(dest_dir, filename) if not os.path.exists(base_path): return base_path name, ext = os.path.splitext(filename) i = 1 while True: candidate = os.path.join(dest_dir, f"{name}-{i}{ext}") if not os.path.exists(candidate): return candidate i += 1 def move_or_copy(src: str, dst: str, do_move: bool) -> None: os.makedirs(os.path.dirname(dst), exist_ok=True) if do_move: shutil.move(src, dst) else: shutil.copy2(src, dst) def make_progress_updater(total: int, label: str = "进度"): bar_len = 30 def update(done_count: int): percent = (done_count / total * 100) if total else 100 filled = int(bar_len * done_count / total) if total else bar_len bar = "#" * filled + "-" * (bar_len - filled) print(f"\r{label} [{bar}] {done_count}/{total} ({percent:.1f}%)", end="", flush=True) def done(): print() # newline at end return update, done def gather_zip_files(root: str): return [f for f in os.listdir(root) if f.lower().endswith(".zip")] def main(): parser = argparse.ArgumentParser(description="批量重命名同目录下的ZIP文件并放入新文件夹(多线程与进度条)") parser.add_argument("--out", default="renamed_zips", help="输出文件夹名称(默认:renamed_zips)") parser.add_argument("--workers", type=int, default=max(4, (os.cpu_count() or 2) * 2), help="线程数(默认:CPU*2,至少4)") parser.add_argument("--move", action="store_true", help="将文件移动到新文件夹(默认复制)") args = parser.parse_args() script_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(script_dir) out_dir = os.path.join(script_dir, args.out) os.makedirs(out_dir, exist_ok=True) zip_files = gather_zip_files(script_dir) if not zip_files: print("未在脚本同目录下找到任何 .zip 文件。") return 0 # Prepare tasks tasks = [] skipped = [] for name in zip_files: info = extract_version_model(name) if not info: skipped.append(name) continue version, model = info new_name = f"cjsyun-{model}-{version}.zip" dest_path = compute_unique_path(out_dir, new_name) src_path = os.path.join(script_dir, name) tasks.append((src_path, dest_path)) if not tasks: print("没有符合重命名规则的 .zip 文件。") if skipped: print("以下文件被跳过:") for s in skipped: print(f" - {s}") return 0 update, done = make_progress_updater(len(tasks)) done_count = 0 print(f"开始处理,共 {len(tasks)} 个文件,输出至文件夹:{args.out}") with ThreadPoolExecutor(max_workers=args.workers) as executor: future_to_task = {executor.submit(move_or_copy, src, dst, args.move): (src, dst) for src, dst in tasks} for future in as_completed(future_to_task): # Consume exceptions to keep progress going try: future.result() except Exception as e: src, dst = future_to_task[future] print(f"\n处理失败:{os.path.basename(src)} -> {os.path.basename(dst)} | 错误: {e}") finally: done_count += 1 update(done_count) done() # Summary print("处理完成。重命名结果示例:") preview = 0 for _, dest in tasks: print(f" - {os.path.basename(dest)}") preview += 1 if preview >= 5: break if skipped: print("以下文件未匹配规则而被跳过:") for s in skipped[:10]: print(f" - {s}") if len(skipped) > 10: print(f" ... 以及另外 {len(skipped) - 10} 个") print(f"输出文件夹位置:{out_dir}") print("提示:如需移动而非复制,请添加 --move 选项。") return 0 if __name__ == "__main__": sys.exit(main())