Files
Sortarr/backend/sortarr/tools.py
2026-05-15 02:41:52 +00:00

122 lines
3.9 KiB
Python

from __future__ import annotations
import shutil
import subprocess
import time
from pathlib import Path
def duplicate_finder(config: dict, library: dict | None) -> dict:
duplicates = []
collections = (library or {}).get("collections") or {}
for collection in list(collections.get("movies", [])) + list(collections.get("series", [])):
files = collection.get("files") or []
if len(files) < 2:
continue
total_size = sum(int(item.get("size") or 0) for item in files)
duplicates.append({
"key": collection.get("key"),
"title": collection.get("metadata", {}).get("title") or collection.get("title"),
"library": collection.get("library"),
"count": len(files),
"total_size": total_size,
"files": sorted(files, key=lambda item: (item.get("size") or 0), reverse=True)[:20],
})
return {
"count": len(duplicates),
"duplicates": sorted(duplicates, key=lambda item: item["total_size"], reverse=True)[:100],
"generated_at": time.time(),
}
def subtitle_audit(config: dict, library: dict | None) -> dict:
media_extensions = set(config["app"].get("media_extensions", []))
subtitle_extensions = config["app"].get("subtitle_extensions", [])
missing = []
present = 0
unknown = 0
for item in (library or {}).get("items", []):
path = Path(item["path"])
if path.suffix.lower() not in media_extensions:
continue
if item.get("has_subtitles") is True:
present += 1
elif "has_subtitles" not in item:
unknown += 1
else:
missing.append({
"name": item["name"],
"path": str(path),
"drive": item.get("drive"),
"expected": [f"{path.stem}{ext}" for ext in subtitle_extensions[:3]],
})
return {
"checked": present + len(missing) + unknown,
"with_subtitles": present,
"unknown_count": unknown,
"missing_count": len(missing),
"missing": missing[:500],
"generated_at": time.time(),
}
def transcode_plan(config: dict, library: dict | None) -> dict:
targets = []
for item in (library or {}).get("items", []):
path = Path(item["path"])
if path.suffix.lower() == ".mp4":
continue
output = path.with_suffix(".mp4")
command = [
"ffmpeg",
"-hide_banner",
"-y",
"-i",
str(path),
"-map",
"0",
"-c:v",
"libx264",
"-preset",
"veryfast",
"-crf",
"20",
"-c:a",
"aac",
"-c:s",
"mov_text",
str(output),
]
targets.append({
"name": item["name"],
"source": str(path),
"output": str(output),
"drive": item.get("drive"),
"command": command,
})
return {
"ffmpeg_available": shutil.which("ffmpeg") is not None,
"count": len(targets),
"targets": targets[:100],
"generated_at": time.time(),
}
def run_next_transcode(config: dict, library: dict | None) -> dict:
plan = transcode_plan(config, library)
if not plan["targets"]:
return {**plan, "status": "empty"}
if not plan["ffmpeg_available"]:
return {**plan, "status": "ffmpeg-unavailable"}
if config["app"].get("dry_run"):
return {**plan, "status": "dry-run"}
target = plan["targets"][0]
completed = subprocess.run(target["command"], capture_output=True, text=True, timeout=60 * 60)
return {
**plan,
"status": "completed" if completed.returncode == 0 else "failed",
"ran": target,
"returncode": completed.returncode,
"stderr": completed.stderr[-4000:],
}