site_liveq/liveq/route_operations.py

26 lines
622 B
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import json
from typing import List
from constants import LOCALQUEUE
from models import QueueItem
QUEUE_JSON = 'queue.json'
def queue_read() -> List[QueueItem]:
if not (LOCALQUEUE / QUEUE_JSON).exists():
return []
return [*map(QueueItem.from_json,
json.loads((LOCALQUEUE / QUEUE_JSON).read_text(
encoding='utf-8', errors='ignore')))]
def queue_write(items: List[QueueItem]):
(LOCALQUEUE / QUEUE_JSON).write_text(
json.dumps([*map(QueueItem.to_json, items)], indent=4),
encoding='utf-8'
)