from __future__ import annotations import json from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen from .cache import get_json, set_json TMDB_BASE = "https://api.themoviedb.org/3" TMDB_TTL_SECONDS = 7 * 24 * 60 * 60 def _auth(config: dict) -> tuple[dict[str, str], str | None]: meta = config.get("metadata", {}) token = meta.get("tmdb_bearer_token") or "" api_key = meta.get("tmdb_api_key") or "" headers = {"Accept": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" return headers, api_key or None def tmdb_available(config: dict) -> bool: meta = config.get("metadata", {}) if not meta.get("tmdb_enabled", True): return False return bool(meta.get("tmdb_bearer_token") or meta.get("tmdb_api_key")) def poster_url(config: dict, path: str | None) -> str | None: if not path: return None return f"{config.get('metadata', {}).get('tmdb_image_base', 'https://image.tmdb.org/t/p/w342')}{path}" def tmdb_get(config: dict, endpoint: str, params: dict | None = None) -> dict: headers, api_key = _auth(config) query = dict(params or {}) query.setdefault("language", config.get("metadata", {}).get("tmdb_language", "en-US")) if api_key: query["api_key"] = api_key url = f"{TMDB_BASE}{endpoint}?{urlencode(query)}" cache_key = f"{endpoint}?{urlencode(sorted((key, value) for key, value in query.items() if key != 'api_key'))}" cached = get_json(config, "tmdb", cache_key, TMDB_TTL_SECONDS) if cached is not None: return cached timeout = int(config.get("app", {}).get("organization_metadata_timeout_seconds", 3)) with urlopen(Request(url, headers=headers), timeout=timeout) as response: payload = json.loads(response.read().decode()) set_json(config, "tmdb", cache_key, payload) return payload def test_tmdb(config: dict) -> dict: meta = config.get("metadata", {}) if not meta.get("tmdb_enabled", True): return {"ok": False, "status": "disabled", "message": "TMDb is disabled in settings."} headers, api_key = _auth(config) if not api_key and "Authorization" not in headers: return {"ok": False, "status": "missing-credentials", "message": "No TMDb API key or bearer token is configured."} params = {"language": meta.get("tmdb_language", "en-US")} if api_key: params["api_key"] = api_key url = f"{TMDB_BASE}/configuration?{urlencode(params)}" timeout = int(config.get("app", {}).get("organization_metadata_timeout_seconds", 3)) try: with urlopen(Request(url, headers=headers), timeout=timeout) as response: payload = json.loads(response.read().decode()) images = payload.get("images") or {} secure_base = images.get("secure_base_url") or images.get("base_url") return { "ok": True, "status": "connected", "message": "TMDb accepted the configured credentials.", "image_base": secure_base, "poster_sizes": images.get("poster_sizes") or [], } except HTTPError as exc: return {"ok": False, "status": f"http-{exc.code}", "message": f"TMDb returned HTTP {exc.code}."} except (TimeoutError, URLError) as exc: return {"ok": False, "status": "network-error", "message": str(exc)} except Exception as exc: return {"ok": False, "status": "error", "message": str(exc)} def first_result(config: dict, media_type: str, title: str, year: int | None = None) -> dict | None: if not tmdb_available(config) or not title: return None params = {"query": title} if year and media_type == "movie": params["year"] = year elif year: params["first_air_date_year"] = year try: payload = tmdb_get(config, f"/search/{media_type}", params) except Exception: return None results = payload.get("results") or [] return results[0] if results else None def movie_metadata(config: dict, title: str, year: int | None = None) -> dict: result = first_result(config, "movie", title, year) if not result: return {"title": title, "source": "filename"} return { "source": "tmdb", "tmdb_id": result.get("id"), "title": result.get("title") or title, "overview": result.get("overview") or "", "poster": poster_url(config, result.get("poster_path")), "backdrop": poster_url(config, result.get("backdrop_path")), "release_date": result.get("release_date"), "vote_average": result.get("vote_average"), } def series_metadata(config: dict, title: str, seasons: set[int]) -> dict: result = first_result(config, "tv", title) if not result: return {"title": title, "source": "filename", "seasons": {}} metadata = { "source": "tmdb", "tmdb_id": result.get("id"), "title": result.get("name") or title, "overview": result.get("overview") or "", "poster": poster_url(config, result.get("poster_path")), "backdrop": poster_url(config, result.get("backdrop_path")), "first_air_date": result.get("first_air_date"), "vote_average": result.get("vote_average"), "seasons": {}, } for season in sorted(seasons): try: payload = tmdb_get(config, f"/tv/{result.get('id')}/season/{season}") except Exception: continue metadata["seasons"][str(season)] = { "name": payload.get("name"), "air_date": payload.get("air_date"), "episode_count": len(payload.get("episodes") or []), "episodes": [ { "season": season, "episode": episode.get("episode_number"), "title": episode.get("name"), "overview": episode.get("overview") or "", "air_date": episode.get("air_date"), "still": poster_url(config, episode.get("still_path")), } for episode in payload.get("episodes") or [] ], } return metadata