#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import datetime import urllib.parse import uuid from typing import List, Optional import requests # type: ignore from auth import (generate_twitch_login_link, parse_token_with_permissions, reuseable_oauth) from bms import (DifficultyWrapperForOsu, DifficultyWrapperForRhythmGame, RhythmGameMode, get_crossgame_beatmaps) from constants import (JWT_SECRET_KEY, QUEUE_MAX_ITEM, QUEUE_MAX_MINE, TWITCH_CLIENT_ID, config) from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, status from jose import jwt from models import (ConfigModel, DetailModel, QueuedSongModel, QueueItem, SongModel, TokenPayloadModel, TokenUserWithPermissionsModel) from route_operations import queue_read, queue_write router = FastAPI( debug=False, title=config['app'].get('title', 'LiveQ'), description=config['app'].get( 'description', 'A multi-game queue for live streamers') ) @router.get('/config', response_model=ConfigModel, summary='Server configurarion', status_code=200, responses={}) async def cfg() -> dict[str, dict[str, str]]: return ConfigModel(**{ # type: ignore section: { k: v.strip() for k, v in config[section].items() } for section in config.sections() if section != 'paths' }) @router.get('/user', response_model=TokenUserWithPermissionsModel, summary='User currently logged in', status_code=200, responses={401: {}}) async def get_current_user(token: str = Depends(reuseable_oauth)) -> TokenUserWithPermissionsModel: token_data = parse_token_with_permissions(token) return token_data @router.get('/login/twitch', summary='Redirects to Twitch OAuth2', status_code=302) async def login_twitch(): raise HTTPException( status_code=status.HTTP_302_FOUND, headers=dict(Location=generate_twitch_login_link()) ) @router.post('/callback/twitch', response_model=DetailModel, summary='Twitch token to JWT', status_code=200, responses={400: {}, 401: {}}) async def callback_twitch_js(access_token: str = Form(), scope: str = Form(''), state: str = Form(''), token_type: str = Form(''),): if token_type != 'bearer' or state == '' or scope != '': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, ) r1 = requests.get('https://id.twitch.tv/oauth2/validate', headers=dict(Authorization=f'OAuth {access_token}')) if r1.status_code != 200: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, ) j1 = r1.json() userurl = f'https://api.twitch.tv/helix/users?id={j1["user_id"]}' r2 = requests.get( userurl, headers={'Authorization': f'Bearer {access_token}', 'Client-Id': TWITCH_CLIENT_ID, }) if r2.status_code != 200: print(r2.status_code) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, ) j2 = r2.json() usr = j2['data'][0] raise HTTPException( status_code=status.HTTP_200_OK, detail='bearer '+jwt.encode( dict( platform='twitch', id=usr['id'], login=usr['login'], display=usr['display_name'], avatar=usr['profile_image_url'], expiration=(datetime.datetime.utcnow() + datetime.timedelta(days=1)).isoformat() ), JWT_SECRET_KEY) ) @router.get('/songs', summary='Songs', response_model=List[SongModel], status_code=200, responses={302: {}}) def songs(): get_crossgame_beatmaps() raise HTTPException( status_code=status.HTTP_302_FOUND, headers=dict(Location='/localcache/cached_min.json') ) @router.get('/games', summary='Games', response_model=dict[str, dict[str, str]], status_code=200, responses={}) def games_enum() -> dict[str, dict[str, str]]: return { 'n2v': {x.name: str(x.value) for x in RhythmGameMode}, 'v2n': {str(x.value): x.name for x in RhythmGameMode}, } @router.get('/queue', summary='Queue', response_model=List[QueuedSongModel], status_code=200, responses={302: {}}) def queue(): raise HTTPException( status_code=status.HTTP_302_FOUND, headers=dict(Location=f'/localqueue/queue.json?v={uuid.uuid4().hex}') ) @router.post('/queue/add', summary='Add to Queue', response_model=DetailModel, status_code=201, responses={404: {}, 406: {}}) def queue_add(beatmap: int, token: str = Depends(reuseable_oauth)): token_with_permissions = parse_token_with_permissions(token) token_data: TokenPayloadModel = token_with_permissions.user is_streamer = token_with_permissions.permissions.is_admin crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps( ) beatmap_: Optional[DifficultyWrapperForOsu] = next( filter(lambda a: a is not None and a.id == beatmap, # type: ignore crossgame_beatmaps), None) if beatmap_ is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) queue = queue_read() queue_size = 0 queue_mine = 0 for item in queue: by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id if not item.played: queue_size += 1 if by_user: queue_mine += 1 if item.id == beatmap_.id: raise HTTPException( status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="already_in_queue", ) if (queue_size >= QUEUE_MAX_ITEM or queue_mine >= QUEUE_MAX_MINE) and not is_streamer: raise HTTPException( status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="queue_full", ) queue.append(QueueItem( id=beatmap_.id, sid=beatmap_.sid, tid=beatmap_.tid, played=False, requester_platform=token_data.platform, requester_uid=token_data.id, requester_display=token_data.display, )) queue_write(queue) raise HTTPException( status_code=status.HTTP_201_CREATED, detail="done_add", ) @router.post('/queue/delete', summary='Remove from Queue', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}}) def queue_del(beatmap: int, token: str = Depends(reuseable_oauth)): token_with_permissions = parse_token_with_permissions(token) token_data: TokenPayloadModel = token_with_permissions.user is_streamer = token_with_permissions.permissions.is_admin crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps( ) beatmap_: Optional[DifficultyWrapperForOsu] = next( filter(lambda a: a is not None and a.id == beatmap, # type: ignore crossgame_beatmaps), None) if beatmap_ is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) queue = queue_read() item: Optional[QueueItem] = next(filter( lambda a: a.id == beatmap_.id, # type: ignore queue), None) if item is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id if not by_user and not is_streamer: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="cannot_remove_others", ) if not is_streamer and item.played: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="cannot_remove_played", ) queue.remove(item) queue_write(queue) raise HTTPException( status_code=status.HTTP_202_ACCEPTED, detail="done_delete", ) @router.post('/queue/unplay', summary='Mark to replay', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}}) def queue_unplay(beatmap: int, token: str = Depends(reuseable_oauth)): token_with_permissions = parse_token_with_permissions(token) is_streamer = token_with_permissions.permissions.is_admin if not is_streamer: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="only_streamer_can_mark_played", ) crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps( ) beatmap_: Optional[DifficultyWrapperForOsu] = next( filter(lambda a: a is not None and a.id == beatmap, # type: ignore crossgame_beatmaps), None) if beatmap_ is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) queue = queue_read() item: Optional[QueueItem] = next(filter( lambda a: a.id == beatmap_.id, # type: ignore queue), None) if item is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) item.played = False queue_write(queue) raise HTTPException( status_code=status.HTTP_202_ACCEPTED, detail="done_mark_unplayed", ) @router.post('/queue/play', summary='Mark as played', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}}) def queue_play(beatmap: int, token: str = Depends(reuseable_oauth)): token_with_permissions = parse_token_with_permissions(token) is_streamer = token_with_permissions.permissions.is_admin if not is_streamer: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="only_streamer_can_mark_played", ) crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps( ) beatmap_: Optional[DifficultyWrapperForOsu] = next( filter(lambda a: a is not None and a.id == beatmap, # type: ignore crossgame_beatmaps), None) if beatmap_ is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) queue = queue_read() item: Optional[QueueItem] = next(filter( lambda a: a.id == beatmap_.id, # type: ignore queue), None) if item is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="not_found", ) item.played = True queue_write(queue) raise HTTPException( status_code=status.HTTP_202_ACCEPTED, detail="done_mark_played", )