122 lines
3.9 KiB
Python
122 lines
3.9 KiB
Python
from __future__ import annotations
|
|
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
def duplicate_finder(config: dict, library: dict | None) -> dict:
|
|
duplicates = []
|
|
collections = (library or {}).get("collections") or {}
|
|
for collection in list(collections.get("movies", [])) + list(collections.get("series", [])):
|
|
files = collection.get("files") or []
|
|
if len(files) < 2:
|
|
continue
|
|
total_size = sum(int(item.get("size") or 0) for item in files)
|
|
duplicates.append({
|
|
"key": collection.get("key"),
|
|
"title": collection.get("metadata", {}).get("title") or collection.get("title"),
|
|
"library": collection.get("library"),
|
|
"count": len(files),
|
|
"total_size": total_size,
|
|
"files": sorted(files, key=lambda item: (item.get("size") or 0), reverse=True)[:20],
|
|
})
|
|
return {
|
|
"count": len(duplicates),
|
|
"duplicates": sorted(duplicates, key=lambda item: item["total_size"], reverse=True)[:100],
|
|
"generated_at": time.time(),
|
|
}
|
|
|
|
|
|
def subtitle_audit(config: dict, library: dict | None) -> dict:
|
|
media_extensions = set(config["app"].get("media_extensions", []))
|
|
subtitle_extensions = config["app"].get("subtitle_extensions", [])
|
|
missing = []
|
|
present = 0
|
|
unknown = 0
|
|
for item in (library or {}).get("items", []):
|
|
path = Path(item["path"])
|
|
if path.suffix.lower() not in media_extensions:
|
|
continue
|
|
if item.get("has_subtitles") is True:
|
|
present += 1
|
|
elif "has_subtitles" not in item:
|
|
unknown += 1
|
|
else:
|
|
missing.append({
|
|
"name": item["name"],
|
|
"path": str(path),
|
|
"drive": item.get("drive"),
|
|
"expected": [f"{path.stem}{ext}" for ext in subtitle_extensions[:3]],
|
|
})
|
|
return {
|
|
"checked": present + len(missing) + unknown,
|
|
"with_subtitles": present,
|
|
"unknown_count": unknown,
|
|
"missing_count": len(missing),
|
|
"missing": missing[:500],
|
|
"generated_at": time.time(),
|
|
}
|
|
|
|
|
|
def transcode_plan(config: dict, library: dict | None) -> dict:
|
|
targets = []
|
|
for item in (library or {}).get("items", []):
|
|
path = Path(item["path"])
|
|
if path.suffix.lower() == ".mp4":
|
|
continue
|
|
output = path.with_suffix(".mp4")
|
|
command = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i",
|
|
str(path),
|
|
"-map",
|
|
"0",
|
|
"-c:v",
|
|
"libx264",
|
|
"-preset",
|
|
"veryfast",
|
|
"-crf",
|
|
"20",
|
|
"-c:a",
|
|
"aac",
|
|
"-c:s",
|
|
"mov_text",
|
|
str(output),
|
|
]
|
|
targets.append({
|
|
"name": item["name"],
|
|
"source": str(path),
|
|
"output": str(output),
|
|
"drive": item.get("drive"),
|
|
"command": command,
|
|
})
|
|
return {
|
|
"ffmpeg_available": shutil.which("ffmpeg") is not None,
|
|
"count": len(targets),
|
|
"targets": targets[:100],
|
|
"generated_at": time.time(),
|
|
}
|
|
|
|
|
|
def run_next_transcode(config: dict, library: dict | None) -> dict:
|
|
plan = transcode_plan(config, library)
|
|
if not plan["targets"]:
|
|
return {**plan, "status": "empty"}
|
|
if not plan["ffmpeg_available"]:
|
|
return {**plan, "status": "ffmpeg-unavailable"}
|
|
if config["app"].get("dry_run"):
|
|
return {**plan, "status": "dry-run"}
|
|
target = plan["targets"][0]
|
|
completed = subprocess.run(target["command"], capture_output=True, text=True, timeout=60 * 60)
|
|
return {
|
|
**plan,
|
|
"status": "completed" if completed.returncode == 0 else "failed",
|
|
"ran": target,
|
|
"returncode": completed.returncode,
|
|
"stderr": completed.stderr[-4000:],
|
|
}
|