from __future__ import annotations import json import os import subprocess from pathlib import Path from .cache import get_json, remove_json, set_json def _allowed_roots(config: dict) -> list[Path]: roots = [Path(drive["path"]).resolve() for drive in config.get("drives", [])] roots.append(Path(config["paths"]["downloads"]).resolve()) return roots def assert_allowed_path(config: dict, path: str) -> Path: target = Path(path).resolve() for root in _allowed_roots(config): try: target.relative_to(root) return target except ValueError: continue raise ValueError("path is outside configured media and downloads roots") def media_probe(config: dict, path: str) -> dict: target = assert_allowed_path(config, path) stat = target.stat() cache_key = f"{target}:{stat.st_size}:{int(stat.st_mtime)}" cached = get_json(config, "ffprobe", cache_key) if cached is not None: return cached command = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(target), ] completed = subprocess.run(command, capture_output=True, text=True, timeout=60) if completed.returncode != 0: return {"path": str(target), "status": "failed", "stderr": completed.stderr[-4000:]} payload = json.loads(completed.stdout or "{}") streams = payload.get("streams", []) result = { "path": str(target), "cache_key": cache_key, "status": "ok", "format": payload.get("format", {}), "audio": [stream for stream in streams if stream.get("codec_type") == "audio"], "subtitles": [stream for stream in streams if stream.get("codec_type") == "subtitle"], "video": [stream for stream in streams if stream.get("codec_type") == "video"], } set_json(config, "ffprobe", cache_key, result) return result def _stream_type_positions(probe: dict) -> dict[int, tuple[str, int]]: positions = {"audio": 0, "subtitle": 0, "video": 0} result = {} for stream in probe.get("video", []) + probe.get("audio", []) + probe.get("subtitles", []): codec_type = stream.get("codec_type") if codec_type not in positions: continue result[int(stream["index"])] = (codec_type, positions[codec_type]) positions[codec_type] += 1 return result def edit_track(config: dict, path: str, action: str, stream_index: int) -> dict: target = assert_allowed_path(config, path) probe = media_probe(config, str(target)) positions = _stream_type_positions(probe) if stream_index not in positions: raise ValueError("stream index was not found") codec_type, type_index = positions[stream_index] if codec_type not in {"audio", "subtitle"}: raise ValueError("only audio and subtitle streams can be edited here") tmp = target.with_suffix(target.suffix + ".tracksorting") if action == "remove": command = ["ffmpeg", "-hide_banner", "-y", "-i", str(target), "-map", "0", "-map", f"-0:{stream_index}", "-c", "copy", str(tmp)] elif action == "set-default": spec = "a" if codec_type == "audio" else "s" command = [ "ffmpeg", "-hide_banner", "-y", "-i", str(target), "-map", "0", "-c", "copy", f"-disposition:{spec}", "0", f"-disposition:{spec}:{type_index}", "default", str(tmp), ] else: raise ValueError("unsupported track action") if config["app"].get("dry_run"): return {"status": "dry-run", "path": str(target), "action": action, "stream_index": stream_index, "command": command} completed = subprocess.run(command, capture_output=True, text=True, timeout=60 * 60) if completed.returncode != 0: try: tmp.unlink() except FileNotFoundError: pass return {"status": "failed", "returncode": completed.returncode, "stderr": completed.stderr[-4000:], "command": command} os.replace(tmp, target) remove_json(config, "ffprobe", probe.get("cache_key", "")) return {"status": "updated", "path": str(target), "action": action, "stream_index": stream_index}