from __future__ import annotations import json import os import threading import time from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import urlparse from urllib.parse import parse_qs, unquote from .config import load_config, public_config from .downloads import downloads_snapshot from .library import enrich_library_metadata, library_snapshot, normalize_library from .logging_setup import configure_logging from .media_probe import edit_track, media_probe from .metadata import movie_metadata_by_id, search_metadata, series_metadata_by_id, test_tmdb from .organizer import execute_bundle_plan from .releases import fetch_releases from .scanner import Scanner from .store import JsonStore from .storage import drive_stats from .tools import duplicate_finder, run_next_transcode, subtitle_audit, transcode_plan SETTINGS_SCHEMA = { "app": { "name": str, "dry_run": bool, "log_level": str, "scan_interval_seconds": int, "settle_seconds": int, "stable_checks": int, "incomplete_suffixes": list, "media_extensions": list, "subtitle_extensions": list, "extra_keywords": list, "library_scan_max_files": int, "library_scan_timeout_seconds": int, "cache_max_bytes": int, "auto_move_min_confidence": int, "review_min_confidence": int, "organization_metadata_budget_seconds": int, "organization_metadata_timeout_seconds": int, "metadata_parallelism": int, }, "paths": { "downloads": str, "data": str, "logs": str, "cache": str, }, "library": { "movie_folder": str, "series_folder": str, "movie_file": str, "episode_file": str, "subtitle_file": str, "unknown_folder": str, "collision": str, "duplicate": str, "permissions_mode": str, "directory_mode": str, }, "metadata": { "write_nfo": bool, "provider_order": list, "prefer_existing_nfo": bool, "tmdb_api_key": str, "tmdb_bearer_token": str, "tmdb_language": str, "tmdb_image_base": str, "tmdb_enabled": bool, }, "theme": { "default": str, "allow_custom_css": bool, "custom_css_path": str, }, } def deep_merge(base: dict, override: dict) -> dict: for key, value in override.items(): if isinstance(value, dict) and isinstance(base.get(key), dict): deep_merge(base[key], value) else: base[key] = value return base def coerce_value(value, caster): if caster is bool: return bool(value) if caster is int: return int(value) if caster is list: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] return [item.strip() for item in str(value).split(",") if item.strip()] return caster(value) def apply_settings(config: dict, settings: dict) -> dict: if any(key in SETTINGS_SCHEMA["app"] for key in settings): settings = {"app": settings} applied = {} for section, fields in SETTINGS_SCHEMA.items(): values = settings.get(section) if not isinstance(values, dict): continue target = config.setdefault(section, {}) applied_section = applied.setdefault(section, {}) for key, caster in fields.items(): if key not in values: continue target[key] = coerce_value(values[key], caster) applied_section[key] = target[key] if not applied_section: applied.pop(section, None) if isinstance(settings.get("drives"), list): drives = [] for idx, drive in enumerate(settings["drives"]): if not isinstance(drive, dict): continue existing = (config.get("drives") or [{}] * (idx + 1))[idx] if idx < len(config.get("drives", [])) else {} drives.append({ "id": str(drive.get("id", existing.get("id", f"drive{idx + 1}"))), "name": str(drive.get("name", existing.get("name", f"Media Drive {idx + 1}"))), "path": str(drive.get("path", existing.get("path", ""))), "min_free_gb": int(drive.get("min_free_gb", existing.get("min_free_gb", 20))), }) config["drives"] = drives applied["drives"] = drives if isinstance(settings.get("release_providers"), list): providers = [] for provider in settings["release_providers"]: if not isinstance(provider, dict): continue providers.append({ "id": str(provider.get("id", "")), "name": str(provider.get("name", "")), "enabled": bool(provider.get("enabled", False)), "type": str(provider.get("type", "rss")), "url": str(provider.get("url", "")), }) config["release_providers"] = providers applied["release_providers"] = providers return applied CONFIG = load_config() configure_logging(CONFIG["paths"]["logs"], CONFIG["app"].get("log_level", "INFO")) STORE = JsonStore(CONFIG["paths"]["data"]) apply_settings(CONFIG, STORE.snapshot().get("settings", {})) SCANNER = Scanner(CONFIG, STORE) METADATA_REFRESH = {"running": False, "started_at": None, "finished_at": None, "error": None} METADATA_LOCK = threading.Lock() def start_metadata_refresh() -> bool: with METADATA_LOCK: if METADATA_REFRESH["running"]: return False METADATA_REFRESH.update({"running": True, "started_at": time.time(), "finished_at": None, "error": None}) def worker() -> None: try: snap = STORE.snapshot() library = snap.get("library") or {} if not library.get("items"): library = library_snapshot(CONFIG, snap.get("library")) enriched = enrich_library_metadata(CONFIG, library) STORE.set_library(enriched) with METADATA_LOCK: METADATA_REFRESH.update({"running": False, "finished_at": time.time(), "error": None}) except Exception as exc: with METADATA_LOCK: METADATA_REFRESH.update({"running": False, "finished_at": time.time(), "error": str(exc)}) threading.Thread(target=worker, daemon=True).start() return True def public_library_payload(library: dict) -> dict: public = normalize_library(library) public.pop("items", None) return public def find_collection(library: dict, key: str, media_library: str) -> dict | None: collections = library.get("collections") or {} group = "series" if media_library == "tv" else "movies" return next((item for item in collections.get(group, []) if item.get("key") == key), None) def identify_collection(payload: dict) -> dict: media_library = "tv" if payload.get("library") == "tv" else "movie" key = str(payload.get("key") or "") tmdb_id = int(payload.get("tmdb_id") or 0) snap = STORE.snapshot() library = snap.get("library") or {} collection = find_collection(library, key, media_library) if not collection: raise ValueError("library item was not found") if media_library == "tv": seasons = {int(season.get("season") or 0) for season in collection.get("seasons", []) if int(season.get("season") or 0) > 0} metadata = series_metadata_by_id(CONFIG, tmdb_id, seasons) else: metadata = movie_metadata_by_id(CONFIG, tmdb_id) if metadata.get("release_date"): collection["year"] = int(metadata["release_date"][:4]) collection["metadata"] = metadata collection["title"] = metadata.get("title") or collection.get("title") identifications = library.setdefault("identifications", {}) identifications[key] = { "library": media_library, "tmdb_id": tmdb_id, "metadata": metadata, "updated_at": time.time(), } STORE.set_library(library) return collection class Handler(BaseHTTPRequestHandler): server_version = "Sortarr/0.1" def log_message(self, fmt: str, *args) -> None: return def send_json(self, payload, status=HTTPStatus.OK) -> None: body = json.dumps(payload, indent=2).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_OPTIONS(self) -> None: self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET,POST,OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_GET(self) -> None: parsed_url = urlparse(self.path) path = parsed_url.path try: if path == "/api/health": self.send_json({"ok": True}) elif path == "/api/config": self.send_json(public_config(CONFIG)) elif path == "/api/dashboard": snap = STORE.snapshot() cached_library = snap.get("library") or { "drives": drive_stats(CONFIG), "items": [], "counts": {"movies": 0, "tv": 0, "total": 0}, "extensions": {}, "scanned_files": 0, "truncated": False, "cached": False, } cached_library = normalize_library(cached_library) cached_library.pop("items", None) cached_library.pop("collections", None) public_state = { "events": snap.get("events", [])[:200], "organizer": snap.get("organizer", {"queue": [], "updated_at": None}), "settings": snap.get("settings", {}), "updated_at": snap.get("updated_at"), } self.send_json({ "state": public_state, "library": cached_library, "dry_run": CONFIG["app"].get("dry_run"), }) elif path == "/api/library": library = STORE.snapshot().get("library") or { "drives": drive_stats(CONFIG), "items": [], "counts": {"movies": 0, "tv": 0, "total": 0}, "extensions": {}, "scanned_files": 0, "truncated": False, "cached": False, } self.send_json({"library": public_library_payload(library)}) elif path == "/api/library/metadata/status": self.send_json({"metadata": METADATA_REFRESH}) elif path == "/api/downloads": self.send_json({"downloads": downloads_snapshot(CONFIG, STORE.snapshot())}) elif path == "/api/releases": self.send_json({"releases": fetch_releases(CONFIG, STORE.snapshot().get("library"))}) elif path == "/api/media/probe": params = parse_qs(parsed_url.query) target = unquote((params.get("path") or [""])[0]) self.send_json({"media": media_probe(CONFIG, target)}) elif path == "/api/tools/subtitles": self.send_json({"audit": subtitle_audit(CONFIG, STORE.snapshot().get("library"))}) elif path == "/api/tools/transcoder": self.send_json({"transcoder": transcode_plan(CONFIG, STORE.snapshot().get("library"))}) elif path == "/api/tools/duplicates": self.send_json({"duplicates": duplicate_finder(CONFIG, STORE.snapshot().get("library"))}) elif path == "/api/theme/custom.css": custom = CONFIG.get("theme", {}).get("custom_css_path") if custom and CONFIG.get("theme", {}).get("allow_custom_css", True) and os.path.exists(custom): body = open(custom, "rb").read() self.send_response(200) self.send_header("Content-Type", "text/css") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() else: self.send_json({"error": "not found"}, HTTPStatus.NOT_FOUND) except Exception as exc: self.send_json({"error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR) def do_POST(self) -> None: path = urlparse(self.path).path try: if path == "/api/scan": started = SCANNER.request_scan() snap = STORE.snapshot() self.send_json({ "started": started, "status": "started" if started else "already-running", "queue": snap.get("organizer", {}).get("queue", []), }, HTTPStatus.ACCEPTED) elif path == "/api/organizer/approve": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" payload = json.loads(body) plan_id = payload.get("id") snap = STORE.snapshot() queue = snap.get("organizer", {}).get("queue", []) plan = next((item for item in queue if item.get("id") == plan_id), None) if not plan: self.send_json({"error": "plan not found"}, HTTPStatus.NOT_FOUND) return result = execute_bundle_plan(CONFIG, plan, force=True) updated = [result if item.get("id") == plan_id else item for item in queue] STORE.set_organizer_queue(updated) STORE.add_event("info", f"approved organizer plan: {result.get('result')}", path=result.get("source"), confidence=result.get("confidence")) self.send_json({"plan": result}) elif path == "/api/organizer/skip": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" payload = json.loads(body) plan_id = payload.get("id") snap = STORE.snapshot() queue = snap.get("organizer", {}).get("queue", []) updated = [{**item, "status": "skipped", "result": "skipped"} if item.get("id") == plan_id else item for item in queue] STORE.set_organizer_queue(updated) self.send_json({"ok": True}) elif path == "/api/library/scan": snap = STORE.snapshot() library = library_snapshot(CONFIG, snap.get("library")) STORE.set_library(library) self.send_json({"library": library}) elif path == "/api/library/metadata/refresh": started = start_metadata_refresh() self.send_json({"started": started, "metadata": METADATA_REFRESH}, HTTPStatus.ACCEPTED) elif path == "/api/library/identify/search": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" payload = json.loads(body) results = search_metadata(CONFIG, "tv" if payload.get("library") == "tv" else "movie", str(payload.get("query") or ""), payload.get("year")) self.send_json({"results": results}) elif path == "/api/library/identify/apply": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" payload = json.loads(body) collection = identify_collection(payload) self.send_json({"collection": collection}) elif path == "/api/tools/transcoder/run-next": result = run_next_transcode(CONFIG, STORE.snapshot().get("library")) STORE.add_event("info", f"transcoder: {result.get('status')}") self.send_json({"transcoder": result}) elif path == "/api/metadata/tmdb/test": self.send_json({"tmdb": test_tmdb(CONFIG)}) elif path == "/api/media/tracks": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" payload = json.loads(body) result = edit_track(CONFIG, payload.get("path", ""), payload.get("action", ""), int(payload.get("stream_index", -1))) STORE.add_event("info", f"track edit: {result.get('status')}", path=payload.get("path", "")) self.send_json({"media": result}) elif path == "/api/settings": length = int(self.headers.get("Content-Length", "0") or "0") body = self.rfile.read(length).decode() if length else "{}" updates = json.loads(body) applied = apply_settings(CONFIG, updates) snap = STORE.snapshot() settings = snap.get("settings", {}) deep_merge(settings, applied) STORE.state["settings"] = settings STORE.save() self.send_json({"settings": applied, "config": public_config(CONFIG)}) else: self.send_json({"error": "not found"}, HTTPStatus.NOT_FOUND) except Exception as exc: self.send_json({"error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR) def main() -> None: SCANNER.start() host = os.getenv("SORTARR_HOST", "0.0.0.0") port = int(os.getenv("SORTARR_API_PORT", "8099")) ThreadingHTTPServer((host, port), Handler).serve_forever() if __name__ == "__main__": main()