205 lines
5.8 KiB
Python
205 lines
5.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Initialize test data folders by archive signatures.
|
||
|
|
|
||
|
|
The script scans all files in --input and copies matching archives into:
|
||
|
|
--output/nres/<relative path>
|
||
|
|
--output/rsli/<relative path>
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import shutil
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
MAGIC_NRES = b"NRes"
|
||
|
|
MAGIC_RSLI = b"NL\x00\x01"
|
||
|
|
|
||
|
|
|
||
|
|
def is_relative_to(path: Path, base: Path) -> bool:
|
||
|
|
try:
|
||
|
|
path.relative_to(base)
|
||
|
|
except ValueError:
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def detect_archive_type(path: Path) -> str | None:
|
||
|
|
try:
|
||
|
|
with path.open("rb") as handle:
|
||
|
|
magic = handle.read(4)
|
||
|
|
except OSError as exc:
|
||
|
|
print(f"[warn] cannot read {path}: {exc}", file=sys.stderr)
|
||
|
|
return None
|
||
|
|
|
||
|
|
if magic == MAGIC_NRES:
|
||
|
|
return "nres"
|
||
|
|
if magic == MAGIC_RSLI:
|
||
|
|
return "rsli"
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def scan_archives(input_root: Path, excluded_root: Path | None) -> list[tuple[Path, str]]:
|
||
|
|
found: list[tuple[Path, str]] = []
|
||
|
|
for path in sorted(input_root.rglob("*")):
|
||
|
|
if not path.is_file():
|
||
|
|
continue
|
||
|
|
if excluded_root and is_relative_to(path.resolve(), excluded_root):
|
||
|
|
continue
|
||
|
|
|
||
|
|
archive_type = detect_archive_type(path)
|
||
|
|
if archive_type:
|
||
|
|
found.append((path, archive_type))
|
||
|
|
return found
|
||
|
|
|
||
|
|
|
||
|
|
def confirm_overwrite(path: Path) -> str:
|
||
|
|
prompt = (
|
||
|
|
f"File exists: {path}\n"
|
||
|
|
"Overwrite? [y]es / [n]o / [a]ll / [q]uit (default: n): "
|
||
|
|
)
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
answer = input(prompt).strip().lower()
|
||
|
|
except EOFError:
|
||
|
|
return "quit"
|
||
|
|
|
||
|
|
if answer in {"", "n", "no"}:
|
||
|
|
return "no"
|
||
|
|
if answer in {"y", "yes"}:
|
||
|
|
return "yes"
|
||
|
|
if answer in {"a", "all"}:
|
||
|
|
return "all"
|
||
|
|
if answer in {"q", "quit"}:
|
||
|
|
return "quit"
|
||
|
|
print("Please answer with y, n, a, or q.")
|
||
|
|
|
||
|
|
|
||
|
|
def copy_archives(
|
||
|
|
archives: list[tuple[Path, str]],
|
||
|
|
input_root: Path,
|
||
|
|
output_root: Path,
|
||
|
|
force: bool,
|
||
|
|
) -> int:
|
||
|
|
copied = 0
|
||
|
|
skipped = 0
|
||
|
|
overwritten = 0
|
||
|
|
overwrite_all = force
|
||
|
|
|
||
|
|
type_counts = {"nres": 0, "rsli": 0}
|
||
|
|
for _, archive_type in archives:
|
||
|
|
type_counts[archive_type] += 1
|
||
|
|
|
||
|
|
print(
|
||
|
|
f"Found archives: total={len(archives)}, "
|
||
|
|
f"nres={type_counts['nres']}, rsli={type_counts['rsli']}"
|
||
|
|
)
|
||
|
|
|
||
|
|
for source, archive_type in archives:
|
||
|
|
rel_path = source.relative_to(input_root)
|
||
|
|
destination = output_root / archive_type / rel_path
|
||
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
if destination.exists():
|
||
|
|
if destination.is_dir():
|
||
|
|
print(
|
||
|
|
f"[error] destination is a directory, expected file: {destination}",
|
||
|
|
file=sys.stderr,
|
||
|
|
)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
if not overwrite_all:
|
||
|
|
if not sys.stdin.isatty():
|
||
|
|
print(
|
||
|
|
"[error] destination file exists but stdin is not interactive. "
|
||
|
|
"Use --force to overwrite without prompts.",
|
||
|
|
file=sys.stderr,
|
||
|
|
)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
decision = confirm_overwrite(destination)
|
||
|
|
if decision == "quit":
|
||
|
|
print("Aborted by user.")
|
||
|
|
return 130
|
||
|
|
if decision == "no":
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
if decision == "all":
|
||
|
|
overwrite_all = True
|
||
|
|
|
||
|
|
overwritten += 1
|
||
|
|
|
||
|
|
try:
|
||
|
|
shutil.copy2(source, destination)
|
||
|
|
except OSError as exc:
|
||
|
|
print(f"[error] failed to copy {source} -> {destination}: {exc}", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
copied += 1
|
||
|
|
|
||
|
|
print(
|
||
|
|
f"Done: copied={copied}, overwritten={overwritten}, skipped={skipped}, "
|
||
|
|
f"output={output_root}"
|
||
|
|
)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def build_parser() -> argparse.ArgumentParser:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Initialize test data by scanning NRes/RsLi signatures."
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--input",
|
||
|
|
required=True,
|
||
|
|
help="Input directory to scan recursively.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--output",
|
||
|
|
required=True,
|
||
|
|
help="Output root directory (archives go to nres/ and rsli/ subdirs).",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--force",
|
||
|
|
action="store_true",
|
||
|
|
help="Overwrite destination files without confirmation prompts.",
|
||
|
|
)
|
||
|
|
return parser
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
args = build_parser().parse_args()
|
||
|
|
|
||
|
|
input_root = Path(args.input)
|
||
|
|
if not input_root.exists():
|
||
|
|
print(f"[error] input directory does not exist: {input_root}", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
if not input_root.is_dir():
|
||
|
|
print(f"[error] input path is not a directory: {input_root}", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
output_root = Path(args.output)
|
||
|
|
if output_root.exists() and not output_root.is_dir():
|
||
|
|
print(f"[error] output path exists and is not a directory: {output_root}", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
input_resolved = input_root.resolve()
|
||
|
|
output_resolved = output_root.resolve()
|
||
|
|
if input_resolved == output_resolved:
|
||
|
|
print("[error] input and output directories must be different.", file=sys.stderr)
|
||
|
|
return 2
|
||
|
|
|
||
|
|
excluded_root: Path | None = None
|
||
|
|
if is_relative_to(output_resolved, input_resolved):
|
||
|
|
excluded_root = output_resolved
|
||
|
|
print(f"Notice: output is inside input, skipping scan under: {excluded_root}")
|
||
|
|
|
||
|
|
archives = scan_archives(input_root, excluded_root)
|
||
|
|
|
||
|
|
output_root.mkdir(parents=True, exist_ok=True)
|
||
|
|
return copy_archives(archives, input_root, output_root, force=args.force)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
raise SystemExit(main())
|