84 lines
2.9 KiB
Python
84 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class JsonStore:
|
|
def __init__(self, data_dir: str):
|
|
self.path = Path(data_dir) / "state.json"
|
|
self.lock = threading.RLock()
|
|
self.state: dict[str, Any] = {
|
|
"events": [],
|
|
"items": [],
|
|
"plans": [],
|
|
"organizer": {"queue": [], "updated_at": None},
|
|
"library": None,
|
|
"settings": {},
|
|
"updated_at": time.time(),
|
|
}
|
|
self.load()
|
|
|
|
def load(self) -> None:
|
|
with self.lock:
|
|
if self.path.exists():
|
|
try:
|
|
self.state.update(json.loads(self.path.read_text()))
|
|
except json.JSONDecodeError:
|
|
backup = self.path.with_suffix(f".corrupt-{int(time.time())}.json")
|
|
self.path.replace(backup)
|
|
self.state.setdefault("events", []).insert(0, {
|
|
"time": time.time(),
|
|
"level": "error",
|
|
"message": f"Recovered from corrupt state file: {backup.name}",
|
|
})
|
|
|
|
def save(self) -> None:
|
|
with self.lock:
|
|
self.state["updated_at"] = time.time()
|
|
tmp = self.path.with_name(f"{self.path.name}.{uuid.uuid4().hex}.tmp")
|
|
tmp.write_text(json.dumps(self.state, indent=2, sort_keys=True))
|
|
tmp.replace(self.path)
|
|
|
|
def add_event(self, level: str, message: str, **fields: Any) -> None:
|
|
with self.lock:
|
|
event = {"time": time.time(), "level": level, "message": message, **fields}
|
|
self.state.setdefault("events", []).insert(0, event)
|
|
self.state["events"] = self.state["events"][:500]
|
|
self.save()
|
|
|
|
def upsert_item(self, item: dict[str, Any]) -> None:
|
|
with self.lock:
|
|
items = self.state.setdefault("items", [])
|
|
key = item.get("destination") or item.get("source")
|
|
for idx, existing in enumerate(items):
|
|
if (existing.get("destination") or existing.get("source")) == key:
|
|
items[idx] = {**existing, **item}
|
|
break
|
|
else:
|
|
items.append(item)
|
|
self.save()
|
|
|
|
def set_plans(self, plans: list[dict[str, Any]]) -> None:
|
|
with self.lock:
|
|
self.state["plans"] = plans[:200]
|
|
self.save()
|
|
|
|
def set_organizer_queue(self, queue: list[dict[str, Any]]) -> None:
|
|
with self.lock:
|
|
self.state["organizer"] = {"queue": queue[:500], "updated_at": time.time()}
|
|
self.save()
|
|
|
|
def set_library(self, library: dict[str, Any]) -> None:
|
|
with self.lock:
|
|
self.state["library"] = library
|
|
self.save()
|
|
|
|
def snapshot(self) -> dict[str, Any]:
|
|
with self.lock:
|
|
return json.loads(json.dumps(self.state))
|