from __future__ import annotations import json import threading import time import uuid from pathlib import Path from typing import Any class JsonStore: def __init__(self, data_dir: str): self.path = Path(data_dir) / "state.json" self.lock = threading.RLock() self.state: dict[str, Any] = { "events": [], "items": [], "plans": [], "organizer": {"queue": [], "updated_at": None}, "library": None, "settings": {}, "updated_at": time.time(), } self.load() def load(self) -> None: with self.lock: if self.path.exists(): try: self.state.update(json.loads(self.path.read_text())) except json.JSONDecodeError: backup = self.path.with_suffix(f".corrupt-{int(time.time())}.json") self.path.replace(backup) self.state.setdefault("events", []).insert(0, { "time": time.time(), "level": "error", "message": f"Recovered from corrupt state file: {backup.name}", }) def save(self) -> None: with self.lock: self.state["updated_at"] = time.time() tmp = self.path.with_name(f"{self.path.name}.{uuid.uuid4().hex}.tmp") tmp.write_text(json.dumps(self.state, indent=2, sort_keys=True)) tmp.replace(self.path) def add_event(self, level: str, message: str, **fields: Any) -> None: with self.lock: event = {"time": time.time(), "level": level, "message": message, **fields} self.state.setdefault("events", []).insert(0, event) self.state["events"] = self.state["events"][:500] self.save() def upsert_item(self, item: dict[str, Any]) -> None: with self.lock: items = self.state.setdefault("items", []) key = item.get("destination") or item.get("source") for idx, existing in enumerate(items): if (existing.get("destination") or existing.get("source")) == key: items[idx] = {**existing, **item} break else: items.append(item) self.save() def set_plans(self, plans: list[dict[str, Any]]) -> None: with self.lock: self.state["plans"] = plans[:200] self.save() def set_organizer_queue(self, queue: list[dict[str, Any]]) -> None: with self.lock: self.state["organizer"] = {"queue": queue[:500], "updated_at": time.time()} self.save() def set_library(self, library: dict[str, Any]) -> None: with self.lock: self.state["library"] = library self.save() def snapshot(self) -> dict[str, Any]: with self.lock: return json.loads(json.dumps(self.state))