import os import re import shutil import sys from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from collections import Counter try: from tqdm import tqdm # type: ignore _USE_TQDM = True except Exception: _USE_TQDM = False INVALID_WIN_CHARS = '<>:"/\\|?*' def sanitize_folder_name(name: str) -> str: sanitized = ''.join('-' if ch in INVALID_WIN_CHARS else ch for ch in name) return sanitized.strip() def extract_category(zip_name: str) -> str: # Expect pattern like: cjsyun-xxx_aaa_xx.zip base = zip_name if base.startswith('cjsyun-'): base = base[len('cjsyun-') :] # Remove extension if base.lower().endswith('.zip'): base = base[:-4] parts = base.split('_') if len(parts) >= 3: return sanitize_folder_name(parts[1]) return 'Uncategorized' def resolve_target_path(dest_dir: Path, file_name: str) -> Path: target = dest_dir / file_name if not target.exists(): return target stem = target.stem suffix = target.suffix i = 1 while True: candidate = dest_dir / f"{stem}({i}){suffix}" if not candidate.exists(): return candidate i += 1 def process_file(file_path: Path, base_dir: Path, dry_run: bool = False) -> tuple[str, Path, Path]: category = extract_category(file_path.name) dest_dir = base_dir / category if not dry_run: dest_dir.mkdir(parents=True, exist_ok=True) target = resolve_target_path(dest_dir, file_path.name) shutil.move(str(file_path), str(target)) return category, file_path, target else: # Dry-run: don't move, just compute where it would go target = resolve_target_path(dest_dir, file_path.name) return category, file_path, target def main(): import argparse parser = argparse.ArgumentParser(description='并发分类并移动 ZIP 文件到类型文件夹。') parser.add_argument( '--dir', '--from-dir', dest='from_dir', default=None, help='要处理的目录(默认是脚本所在目录)', ) parser.add_argument( '--workers', type=int, default=max(4, (os.cpu_count() or 4) + 4), help='线程数(默认:CPU核数+4,至少4)', ) parser.add_argument( '--dry-run', action='store_true', help='试运行,仅显示计划移动,不实际移动', ) args = parser.parse_args() base_dir = Path(args.from_dir).resolve() if args.from_dir else Path(__file__).resolve().parent if not base_dir.exists() or not base_dir.is_dir(): print(f'目录无效:{base_dir}') sys.exit(1) zip_files = sorted(base_dir.glob('*.zip')) if not zip_files: print(f'未在目录中找到 zip 文件:{base_dir}') sys.exit(0) print(f'发现 {len(zip_files)} 个 ZIP 文件,开始并发处理...') moved_counter: Counter[str] = Counter() # Prepare progress bar pbar = None if _USE_TQDM: pbar = tqdm(total=len(zip_files), ncols=80, desc='处理进度') futures = [] with ThreadPoolExecutor(max_workers=args.workers) as executor: for f in zip_files: futures.append(executor.submit(process_file, f, base_dir, args.dry_run)) for fut in as_completed(futures): try: category, src, dst = fut.result() moved_counter[category] += 1 if _USE_TQDM: assert pbar is not None pbar.update(1) else: processed = sum(moved_counter.values()) percent = processed * 100 // len(zip_files) print(f'[{percent:3d}%] {src.name} -> {category}/') except Exception as e: if _USE_TQDM: assert pbar is not None pbar.write(f'错误:{e!r}') else: print(f'错误:{e!r}') if _USE_TQDM and pbar is not None: pbar.close() print('\n分类汇总:') for cat, cnt in sorted(moved_counter.items()): print(f'- {cat}: {cnt} 个') if args.dry_run: print('\n试运行完成(未实际移动文件)。取消 --dry-run 以执行移动。') else: print('\n完成:已将 ZIP 文件移动到对应类型的文件夹。') if __name__ == '__main__': main()