site_liveq/liveq/routes.py

286 lines
10 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import datetime
import urllib.parse
import uuid
from typing import List, Optional
import requests # type: ignore
from auth import (generate_twitch_login_link, parse_token_with_permissions,
reuseable_oauth)
from bms import (DifficultyWrapperForOsu, DifficultyWrapperForRhythmGame,
RhythmGameMode, get_crossgame_beatmaps)
from constants import (JWT_SECRET_KEY, QUEUE_MAX_ITEM, QUEUE_MAX_MINE,
TWITCH_CLIENT_ID, config)
from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, status
from jose import jwt
from models import (ConfigModel, DetailModel, QueuedSongModel, QueueItem, SongModel,
TokenPayloadModel, TokenUserWithPermissionsModel)
from route_operations import queue_read, queue_write
router = FastAPI(
debug=False,
title=config['app'].get('title', 'LiveQ'),
description=config['app'].get(
'description', 'A multi-game queue for live streamers')
)
@router.get('/config', response_model=ConfigModel, summary='Server configurarion', status_code=200, responses={})
async def cfg() -> dict[str, dict[str, str]]:
return ConfigModel(**{ # type: ignore
section: {
k: v.strip()
for k, v in config[section].items()
}
for section in config.sections()
if section != 'paths'
})
@router.get('/user', response_model=TokenUserWithPermissionsModel, summary='User currently logged in', status_code=200, responses={401: {}})
async def get_current_user(token: str = Depends(reuseable_oauth)) -> TokenUserWithPermissionsModel:
token_data = parse_token_with_permissions(token)
return token_data
@router.get('/login/twitch', summary='Redirects to Twitch OAuth2', status_code=302)
async def login_twitch():
raise HTTPException(
status_code=status.HTTP_302_FOUND,
headers=dict(Location=generate_twitch_login_link())
)
@router.post('/callback/twitch', response_model=DetailModel, summary='Twitch token to JWT', status_code=200, responses={400: {}, 401: {}})
async def callback_twitch_js(access_token: str = Form(), scope: str = Form(''), state: str = Form(''), token_type: str = Form(''),):
if token_type != 'bearer' or state == '' or scope != '':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
)
r1 = requests.get('https://id.twitch.tv/oauth2/validate',
headers=dict(Authorization=f'OAuth {access_token}'))
if r1.status_code != 200:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
)
j1 = r1.json()
userurl = f'https://api.twitch.tv/helix/users?id={j1["user_id"]}'
r2 = requests.get(
userurl,
headers={'Authorization': f'Bearer {access_token}',
'Client-Id': TWITCH_CLIENT_ID,
})
if r2.status_code != 200:
print(r2.status_code)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
)
j2 = r2.json()
usr = j2['data'][0]
raise HTTPException(
status_code=status.HTTP_200_OK,
detail='bearer '+jwt.encode(
dict(
platform='twitch',
id=usr['id'],
login=usr['login'],
display=usr['display_name'],
avatar=usr['profile_image_url'],
expiration=(datetime.datetime.utcnow() +
datetime.timedelta(days=1)).isoformat()
),
JWT_SECRET_KEY)
)
@router.get('/songs', summary='Songs', response_model=List[SongModel], status_code=200, responses={302: {}})
def songs():
get_crossgame_beatmaps()
raise HTTPException(
status_code=status.HTTP_302_FOUND,
headers=dict(Location='/localcache/cached_min.json')
)
@router.get('/games', summary='Games', response_model=dict[str, dict[str, str]], status_code=200, responses={})
def games_enum() -> dict[str, dict[str, str]]:
return {
'n2v': {x.name: str(x.value) for x in RhythmGameMode},
'v2n': {str(x.value): x.name for x in RhythmGameMode},
}
@router.get('/queue', summary='Queue', response_model=List[QueuedSongModel], status_code=200, responses={302: {}})
def queue():
raise HTTPException(
status_code=status.HTTP_302_FOUND,
headers=dict(Location=f'/localqueue/queue.json?v={uuid.uuid4().hex}')
)
@router.post('/queue/add', summary='Add to Queue', response_model=DetailModel, status_code=201, responses={404: {}, 406: {}})
def queue_add(beatmap: int, token: str = Depends(reuseable_oauth)):
token_with_permissions = parse_token_with_permissions(token)
token_data: TokenPayloadModel = token_with_permissions.user
is_streamer = token_with_permissions.permissions.is_admin
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
)
beatmap_: Optional[DifficultyWrapperForOsu] = next(
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
crossgame_beatmaps), None)
if beatmap_ is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
queue = queue_read()
queue_size = 0
queue_mine = 0
for item in queue:
by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id
if not item.played:
queue_size += 1
if by_user:
queue_mine += 1
if item.id == beatmap_.id:
raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="already_in_queue",
)
if (queue_size >= QUEUE_MAX_ITEM or queue_mine >= QUEUE_MAX_MINE) and not is_streamer:
raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="queue_full",
)
queue.append(QueueItem(
id=beatmap_.id,
sid=beatmap_.sid,
tid=beatmap_.tid,
played=False,
requester_platform=token_data.platform,
requester_uid=token_data.id,
requester_display=token_data.display,
))
queue_write(queue)
raise HTTPException(
status_code=status.HTTP_201_CREATED,
detail="done_add",
)
@router.post('/queue/delete', summary='Remove from Queue', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
def queue_del(beatmap: int, token: str = Depends(reuseable_oauth)):
token_with_permissions = parse_token_with_permissions(token)
token_data: TokenPayloadModel = token_with_permissions.user
is_streamer = token_with_permissions.permissions.is_admin
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
)
beatmap_: Optional[DifficultyWrapperForOsu] = next(
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
crossgame_beatmaps), None)
if beatmap_ is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
queue = queue_read()
item: Optional[QueueItem] = next(filter(
lambda a: a.id == beatmap_.id, # type: ignore
queue), None)
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id
if not by_user and not is_streamer:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="cannot_remove_others",
)
if not is_streamer and item.played:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="cannot_remove_played",
)
queue.remove(item)
queue_write(queue)
raise HTTPException(
status_code=status.HTTP_202_ACCEPTED,
detail="done_delete",
)
@router.post('/queue/unplay', summary='Mark to replay', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
def queue_unplay(beatmap: int, token: str = Depends(reuseable_oauth)):
token_with_permissions = parse_token_with_permissions(token)
is_streamer = token_with_permissions.permissions.is_admin
if not is_streamer:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="only_streamer_can_mark_played",
)
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
)
beatmap_: Optional[DifficultyWrapperForOsu] = next(
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
crossgame_beatmaps), None)
if beatmap_ is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
queue = queue_read()
item: Optional[QueueItem] = next(filter(
lambda a: a.id == beatmap_.id, # type: ignore
queue), None)
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
item.played = False
queue_write(queue)
raise HTTPException(
status_code=status.HTTP_202_ACCEPTED,
detail="done_mark_unplayed",
)
@router.post('/queue/play', summary='Mark as played', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
def queue_play(beatmap: int, token: str = Depends(reuseable_oauth)):
token_with_permissions = parse_token_with_permissions(token)
is_streamer = token_with_permissions.permissions.is_admin
if not is_streamer:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="only_streamer_can_mark_played",
)
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
)
beatmap_: Optional[DifficultyWrapperForOsu] = next(
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
crossgame_beatmaps), None)
if beatmap_ is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
queue = queue_read()
item: Optional[QueueItem] = next(filter(
lambda a: a.id == beatmap_.id, # type: ignore
queue), None)
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="not_found",
)
item.played = True
queue_write(queue)
raise HTTPException(
status_code=status.HTTP_202_ACCEPTED,
detail="done_mark_played",
)