update
This commit is contained in:
157
rename_zip.py
Normal file
157
rename_zip.py
Normal file
@@ -0,0 +1,157 @@
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import argparse
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
|
||||
def extract_version_model(filename: str):
|
||||
"""Extract version and model from a filename like:
|
||||
cjsyun-1.21.1_Fabric_xxx.zip -> ("1.21.1", "fabric")
|
||||
|
||||
Returns None if the filename does not match expected pattern.
|
||||
"""
|
||||
base = os.path.splitext(os.path.basename(filename))[0]
|
||||
if not base.startswith("cjsyun-"):
|
||||
return None
|
||||
|
||||
rest = base[len("cjsyun-"):]
|
||||
parts = rest.split("_")
|
||||
if len(parts) < 2:
|
||||
return None
|
||||
|
||||
version_raw = parts[0].strip()
|
||||
model_raw = parts[1].strip()
|
||||
|
||||
# Keep alphanumeric and dashes in model; lowercase for normalization
|
||||
model = "".join(ch for ch in model_raw if ch.isalnum() or ch == "-").lower()
|
||||
version = version_raw
|
||||
|
||||
if not version or not model:
|
||||
return None
|
||||
return version, model
|
||||
|
||||
|
||||
def compute_unique_path(dest_dir: str, filename: str) -> str:
|
||||
"""Return a unique destination path inside dest_dir for filename."""
|
||||
base_path = os.path.join(dest_dir, filename)
|
||||
if not os.path.exists(base_path):
|
||||
return base_path
|
||||
name, ext = os.path.splitext(filename)
|
||||
i = 1
|
||||
while True:
|
||||
candidate = os.path.join(dest_dir, f"{name}-{i}{ext}")
|
||||
if not os.path.exists(candidate):
|
||||
return candidate
|
||||
i += 1
|
||||
|
||||
|
||||
def move_or_copy(src: str, dst: str, do_move: bool) -> None:
|
||||
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
||||
if do_move:
|
||||
shutil.move(src, dst)
|
||||
else:
|
||||
shutil.copy2(src, dst)
|
||||
|
||||
|
||||
def make_progress_updater(total: int, label: str = "进度"):
|
||||
bar_len = 30
|
||||
|
||||
def update(done_count: int):
|
||||
percent = (done_count / total * 100) if total else 100
|
||||
filled = int(bar_len * done_count / total) if total else bar_len
|
||||
bar = "#" * filled + "-" * (bar_len - filled)
|
||||
print(f"\r{label} [{bar}] {done_count}/{total} ({percent:.1f}%)", end="", flush=True)
|
||||
|
||||
def done():
|
||||
print() # newline at end
|
||||
|
||||
return update, done
|
||||
|
||||
|
||||
def gather_zip_files(root: str):
|
||||
return [f for f in os.listdir(root) if f.lower().endswith(".zip")]
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="批量重命名同目录下的ZIP文件并放入新文件夹(多线程与进度条)")
|
||||
parser.add_argument("--out", default="renamed_zips", help="输出文件夹名称(默认:renamed_zips)")
|
||||
parser.add_argument("--workers", type=int, default=max(4, (os.cpu_count() or 2) * 2), help="线程数(默认:CPU*2,至少4)")
|
||||
parser.add_argument("--move", action="store_true", help="将文件移动到新文件夹(默认复制)")
|
||||
args = parser.parse_args()
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
os.chdir(script_dir)
|
||||
|
||||
out_dir = os.path.join(script_dir, args.out)
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
zip_files = gather_zip_files(script_dir)
|
||||
if not zip_files:
|
||||
print("未在脚本同目录下找到任何 .zip 文件。")
|
||||
return 0
|
||||
|
||||
# Prepare tasks
|
||||
tasks = []
|
||||
skipped = []
|
||||
for name in zip_files:
|
||||
info = extract_version_model(name)
|
||||
if not info:
|
||||
skipped.append(name)
|
||||
continue
|
||||
version, model = info
|
||||
new_name = f"cjsyun-{model}-{version}.zip"
|
||||
dest_path = compute_unique_path(out_dir, new_name)
|
||||
src_path = os.path.join(script_dir, name)
|
||||
tasks.append((src_path, dest_path))
|
||||
|
||||
if not tasks:
|
||||
print("没有符合重命名规则的 .zip 文件。")
|
||||
if skipped:
|
||||
print("以下文件被跳过:")
|
||||
for s in skipped:
|
||||
print(f" - {s}")
|
||||
return 0
|
||||
|
||||
update, done = make_progress_updater(len(tasks))
|
||||
done_count = 0
|
||||
|
||||
print(f"开始处理,共 {len(tasks)} 个文件,输出至文件夹:{args.out}")
|
||||
with ThreadPoolExecutor(max_workers=args.workers) as executor:
|
||||
future_to_task = {executor.submit(move_or_copy, src, dst, args.move): (src, dst) for src, dst in tasks}
|
||||
for future in as_completed(future_to_task):
|
||||
# Consume exceptions to keep progress going
|
||||
try:
|
||||
future.result()
|
||||
except Exception as e:
|
||||
src, dst = future_to_task[future]
|
||||
print(f"\n处理失败:{os.path.basename(src)} -> {os.path.basename(dst)} | 错误: {e}")
|
||||
finally:
|
||||
done_count += 1
|
||||
update(done_count)
|
||||
|
||||
done()
|
||||
|
||||
# Summary
|
||||
print("处理完成。重命名结果示例:")
|
||||
preview = 0
|
||||
for _, dest in tasks:
|
||||
print(f" - {os.path.basename(dest)}")
|
||||
preview += 1
|
||||
if preview >= 5:
|
||||
break
|
||||
|
||||
if skipped:
|
||||
print("以下文件未匹配规则而被跳过:")
|
||||
for s in skipped[:10]:
|
||||
print(f" - {s}")
|
||||
if len(skipped) > 10:
|
||||
print(f" ... 以及另外 {len(skipped) - 10} 个")
|
||||
|
||||
print(f"输出文件夹位置:{out_dir}")
|
||||
print("提示:如需移动而非复制,请添加 --move 选项。")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user