157 lines
5.0 KiB
Python
157 lines
5.0 KiB
Python
import os
|
||
import sys
|
||
import shutil
|
||
import argparse
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
|
||
def extract_version_model(filename: str):
|
||
"""Extract version and model from a filename like:
|
||
cjsyun-1.21.1_Fabric_xxx.zip -> ("1.21.1", "fabric")
|
||
|
||
Returns None if the filename does not match expected pattern.
|
||
"""
|
||
base = os.path.splitext(os.path.basename(filename))[0]
|
||
if not base.startswith("cjsyun-"):
|
||
return None
|
||
|
||
rest = base[len("cjsyun-"):]
|
||
parts = rest.split("_")
|
||
if len(parts) < 2:
|
||
return None
|
||
|
||
version_raw = parts[0].strip()
|
||
model_raw = parts[1].strip()
|
||
|
||
# Keep alphanumeric and dashes in model; lowercase for normalization
|
||
model = "".join(ch for ch in model_raw if ch.isalnum() or ch == "-").lower()
|
||
version = version_raw
|
||
|
||
if not version or not model:
|
||
return None
|
||
return version, model
|
||
|
||
|
||
def compute_unique_path(dest_dir: str, filename: str) -> str:
|
||
"""Return a unique destination path inside dest_dir for filename."""
|
||
base_path = os.path.join(dest_dir, filename)
|
||
if not os.path.exists(base_path):
|
||
return base_path
|
||
name, ext = os.path.splitext(filename)
|
||
i = 1
|
||
while True:
|
||
candidate = os.path.join(dest_dir, f"{name}-{i}{ext}")
|
||
if not os.path.exists(candidate):
|
||
return candidate
|
||
i += 1
|
||
|
||
|
||
def move_or_copy(src: str, dst: str, do_move: bool) -> None:
|
||
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
||
if do_move:
|
||
shutil.move(src, dst)
|
||
else:
|
||
shutil.copy2(src, dst)
|
||
|
||
|
||
def make_progress_updater(total: int, label: str = "进度"):
|
||
bar_len = 30
|
||
|
||
def update(done_count: int):
|
||
percent = (done_count / total * 100) if total else 100
|
||
filled = int(bar_len * done_count / total) if total else bar_len
|
||
bar = "#" * filled + "-" * (bar_len - filled)
|
||
print(f"\r{label} [{bar}] {done_count}/{total} ({percent:.1f}%)", end="", flush=True)
|
||
|
||
def done():
|
||
print() # newline at end
|
||
|
||
return update, done
|
||
|
||
|
||
def gather_zip_files(root: str):
|
||
return [f for f in os.listdir(root) if f.lower().endswith(".zip")]
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="批量重命名同目录下的ZIP文件并放入新文件夹(多线程与进度条)")
|
||
parser.add_argument("--out", default="renamed_zips", help="输出文件夹名称(默认:renamed_zips)")
|
||
parser.add_argument("--workers", type=int, default=max(4, (os.cpu_count() or 2) * 2), help="线程数(默认:CPU*2,至少4)")
|
||
parser.add_argument("--move", action="store_true", help="将文件移动到新文件夹(默认复制)")
|
||
args = parser.parse_args()
|
||
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
os.chdir(script_dir)
|
||
|
||
out_dir = os.path.join(script_dir, args.out)
|
||
os.makedirs(out_dir, exist_ok=True)
|
||
|
||
zip_files = gather_zip_files(script_dir)
|
||
if not zip_files:
|
||
print("未在脚本同目录下找到任何 .zip 文件。")
|
||
return 0
|
||
|
||
# Prepare tasks
|
||
tasks = []
|
||
skipped = []
|
||
for name in zip_files:
|
||
info = extract_version_model(name)
|
||
if not info:
|
||
skipped.append(name)
|
||
continue
|
||
version, model = info
|
||
new_name = f"cjsyun-{model}-{version}.zip"
|
||
dest_path = compute_unique_path(out_dir, new_name)
|
||
src_path = os.path.join(script_dir, name)
|
||
tasks.append((src_path, dest_path))
|
||
|
||
if not tasks:
|
||
print("没有符合重命名规则的 .zip 文件。")
|
||
if skipped:
|
||
print("以下文件被跳过:")
|
||
for s in skipped:
|
||
print(f" - {s}")
|
||
return 0
|
||
|
||
update, done = make_progress_updater(len(tasks))
|
||
done_count = 0
|
||
|
||
print(f"开始处理,共 {len(tasks)} 个文件,输出至文件夹:{args.out}")
|
||
with ThreadPoolExecutor(max_workers=args.workers) as executor:
|
||
future_to_task = {executor.submit(move_or_copy, src, dst, args.move): (src, dst) for src, dst in tasks}
|
||
for future in as_completed(future_to_task):
|
||
# Consume exceptions to keep progress going
|
||
try:
|
||
future.result()
|
||
except Exception as e:
|
||
src, dst = future_to_task[future]
|
||
print(f"\n处理失败:{os.path.basename(src)} -> {os.path.basename(dst)} | 错误: {e}")
|
||
finally:
|
||
done_count += 1
|
||
update(done_count)
|
||
|
||
done()
|
||
|
||
# Summary
|
||
print("处理完成。重命名结果示例:")
|
||
preview = 0
|
||
for _, dest in tasks:
|
||
print(f" - {os.path.basename(dest)}")
|
||
preview += 1
|
||
if preview >= 5:
|
||
break
|
||
|
||
if skipped:
|
||
print("以下文件未匹配规则而被跳过:")
|
||
for s in skipped[:10]:
|
||
print(f" - {s}")
|
||
if len(skipped) > 10:
|
||
print(f" ... 以及另外 {len(skipped) - 10} 个")
|
||
|
||
print(f"输出文件夹位置:{out_dir}")
|
||
print("提示:如需移动而非复制,请添加 --move 选项。")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main()) |