from __future__ import annotations import time from collections import defaultdict from pathlib import Path def empty_snapshot(root: Path, error: str | None = None) -> dict: return { "path": str(root), "generated_at": time.time(), "current": [], "bundles": [], "loose": [], "recent": [], "counts": { "current": 0, "recent": 0, "media": 0, "subtitles": 0, "incomplete": 0, }, "total_size": 0, "error": error, } def downloads_snapshot(config: dict, state: dict) -> dict: root = Path(config["paths"]["downloads"]) app = config.get("app", {}) media_extensions = set(app.get("media_extensions", [])) subtitle_extensions = set(app.get("subtitle_extensions", [])) incomplete = set(app.get("incomplete_suffixes", [])) current = [] media_files = [] subtitle_files = [] total_size = 0 try: root.mkdir(parents=True, exist_ok=True) paths = root.rglob("*") for path in paths: if not path.is_file(): continue try: stat = path.stat() except OSError: continue suffix = path.suffix.lower() total_size += stat.st_size item = { "name": path.name, "path": str(path), "relative_path": str(path.relative_to(root)), "folder": str(path.parent.relative_to(root)) if path.parent != root else "", "size": stat.st_size, "modified": stat.st_mtime, "extension": suffix or "none", "is_media": suffix in media_extensions, "is_subtitle": suffix in subtitle_extensions, "is_incomplete": suffix in incomplete, } current.append(item) if item["is_media"]: media_files.append(item) elif item["is_subtitle"]: subtitle_files.append(item) except OSError as exc: return empty_snapshot(root, str(exc)) subtitles_by_folder = defaultdict(list) for subtitle in subtitle_files: subtitles_by_folder[subtitle["folder"]].append(subtitle) parent = Path(subtitle["folder"]) if parent.name.lower() in {"subs", "subtitles"}: subtitles_by_folder[str(parent.parent) if str(parent.parent) != "." else ""].append(subtitle) bundles = [] bundled_subtitle_paths = set() for media in media_files: folder_subtitles = subtitles_by_folder.get(media["folder"], []) stem_matches = [ subtitle for subtitle in subtitle_files if subtitle["name"].lower().startswith(Path(media["name"]).stem.lower()) ] seen = set() subtitles = [] for subtitle in folder_subtitles + stem_matches: if subtitle["path"] in seen: continue seen.add(subtitle["path"]) bundled_subtitle_paths.add(subtitle["path"]) subtitles.append(subtitle) bundles.append({ "media": media, "subtitles": sorted(subtitles, key=lambda item: item["name"].lower()), "sidecars": [ item for item in current if item["folder"] == media["folder"] and not item["is_media"] and not item["is_subtitle"] ][:20], "size": media["size"] + sum(item["size"] for item in subtitles), }) loose = [ item for item in current if not item["is_media"] and item["path"] not in bundled_subtitle_paths ] recent = [] for item in state.get("items", []): source = item.get("source", "") status = item.get("status") if source.startswith(str(root)) and status in {"moved", "planned"}: recent.append({ "source": source, "destination": item.get("destination"), "title": item.get("title"), "type": item.get("type"), "status": status, "drive": item.get("drive"), "updated_at": item.get("updated_at"), }) return { "path": str(root), "generated_at": time.time(), "current": sorted(current, key=lambda item: item["modified"], reverse=True), "bundles": sorted(bundles, key=lambda item: item["media"]["modified"], reverse=True), "loose": sorted(loose, key=lambda item: item["modified"], reverse=True), "recent": sorted(recent, key=lambda item: item.get("updated_at") or 0, reverse=True)[:200], "counts": { "current": len(current), "recent": len(recent), "media": sum(1 for item in current if item["is_media"]), "subtitles": sum(1 for item in current if item["is_subtitle"]), "incomplete": sum(1 for item in current if item["is_incomplete"]), }, "total_size": total_size, }