286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import datetime
|
|
import urllib.parse
|
|
import uuid
|
|
from typing import List, Optional
|
|
|
|
import requests # type: ignore
|
|
from auth import (generate_twitch_login_link, parse_token_with_permissions,
|
|
reuseable_oauth)
|
|
from bms import (DifficultyWrapperForOsu, DifficultyWrapperForRhythmGame,
|
|
RhythmGameMode, get_crossgame_beatmaps)
|
|
from constants import (JWT_SECRET_KEY, QUEUE_MAX_ITEM, QUEUE_MAX_MINE,
|
|
TWITCH_CLIENT_ID, config)
|
|
from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, status
|
|
from jose import jwt
|
|
from models import (ConfigModel, DetailModel, QueuedSongModel, QueueItem, SongModel,
|
|
TokenPayloadModel, TokenUserWithPermissionsModel)
|
|
from route_operations import queue_read, queue_write
|
|
|
|
router = FastAPI(
|
|
debug=False,
|
|
title=config['app'].get('title', 'LiveQ'),
|
|
description=config['app'].get(
|
|
'description', 'A multi-game queue for live streamers')
|
|
)
|
|
|
|
|
|
@router.get('/config', response_model=ConfigModel, summary='Server configurarion', status_code=200, responses={})
|
|
async def cfg() -> dict[str, dict[str, str]]:
|
|
return ConfigModel(**{ # type: ignore
|
|
section: {
|
|
k: v.strip()
|
|
for k, v in config[section].items()
|
|
}
|
|
for section in config.sections()
|
|
if section != 'paths'
|
|
})
|
|
|
|
|
|
@router.get('/user', response_model=TokenUserWithPermissionsModel, summary='User currently logged in', status_code=200, responses={401: {}})
|
|
async def get_current_user(token: str = Depends(reuseable_oauth)) -> TokenUserWithPermissionsModel:
|
|
token_data = parse_token_with_permissions(token)
|
|
return token_data
|
|
|
|
|
|
@router.get('/login/twitch', summary='Redirects to Twitch OAuth2', status_code=302)
|
|
async def login_twitch():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_302_FOUND,
|
|
headers=dict(Location=generate_twitch_login_link())
|
|
)
|
|
|
|
|
|
@router.post('/callback/twitch', response_model=DetailModel, summary='Twitch token to JWT', status_code=200, responses={400: {}, 401: {}})
|
|
async def callback_twitch_js(access_token: str = Form(), scope: str = Form(''), state: str = Form(''), token_type: str = Form(''),):
|
|
if token_type != 'bearer' or state == '' or scope != '':
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
r1 = requests.get('https://id.twitch.tv/oauth2/validate',
|
|
headers=dict(Authorization=f'OAuth {access_token}'))
|
|
if r1.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
j1 = r1.json()
|
|
userurl = f'https://api.twitch.tv/helix/users?id={j1["user_id"]}'
|
|
r2 = requests.get(
|
|
userurl,
|
|
headers={'Authorization': f'Bearer {access_token}',
|
|
'Client-Id': TWITCH_CLIENT_ID,
|
|
})
|
|
if r2.status_code != 200:
|
|
print(r2.status_code)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
)
|
|
j2 = r2.json()
|
|
usr = j2['data'][0]
|
|
raise HTTPException(
|
|
status_code=status.HTTP_200_OK,
|
|
detail='bearer '+jwt.encode(
|
|
dict(
|
|
platform='twitch',
|
|
id=usr['id'],
|
|
login=usr['login'],
|
|
display=usr['display_name'],
|
|
avatar=usr['profile_image_url'],
|
|
expiration=(datetime.datetime.utcnow() +
|
|
datetime.timedelta(days=1)).isoformat()
|
|
),
|
|
JWT_SECRET_KEY)
|
|
)
|
|
|
|
|
|
@router.get('/songs', summary='Songs', response_model=List[SongModel], status_code=200, responses={302: {}})
|
|
def songs():
|
|
get_crossgame_beatmaps()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_302_FOUND,
|
|
headers=dict(Location='/localcache/cached_min.json')
|
|
)
|
|
|
|
|
|
@router.get('/games', summary='Games', response_model=dict[str, dict[str, str]], status_code=200, responses={})
|
|
def games_enum() -> dict[str, dict[str, str]]:
|
|
return {
|
|
'n2v': {x.name: str(x.value) for x in RhythmGameMode},
|
|
'v2n': {str(x.value): x.name for x in RhythmGameMode},
|
|
}
|
|
|
|
|
|
@router.get('/queue', summary='Queue', response_model=List[QueuedSongModel], status_code=200, responses={302: {}})
|
|
def queue():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_302_FOUND,
|
|
headers=dict(Location=f'/localqueue/queue.json?v={uuid.uuid4().hex}')
|
|
)
|
|
|
|
|
|
@router.post('/queue/add', summary='Add to Queue', response_model=DetailModel, status_code=201, responses={404: {}, 406: {}})
|
|
def queue_add(beatmap: int, token: str = Depends(reuseable_oauth)):
|
|
token_with_permissions = parse_token_with_permissions(token)
|
|
token_data: TokenPayloadModel = token_with_permissions.user
|
|
is_streamer = token_with_permissions.permissions.is_admin
|
|
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
|
|
)
|
|
beatmap_: Optional[DifficultyWrapperForOsu] = next(
|
|
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
|
|
crossgame_beatmaps), None)
|
|
if beatmap_ is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
queue = queue_read()
|
|
queue_size = 0
|
|
queue_mine = 0
|
|
for item in queue:
|
|
by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id
|
|
if not item.played:
|
|
queue_size += 1
|
|
if by_user:
|
|
queue_mine += 1
|
|
if item.id == beatmap_.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_406_NOT_ACCEPTABLE,
|
|
detail="already_in_queue",
|
|
)
|
|
if (queue_size >= QUEUE_MAX_ITEM or queue_mine >= QUEUE_MAX_MINE) and not is_streamer:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_406_NOT_ACCEPTABLE,
|
|
detail="queue_full",
|
|
)
|
|
queue.append(QueueItem(
|
|
id=beatmap_.id,
|
|
sid=beatmap_.sid,
|
|
tid=beatmap_.tid,
|
|
played=False,
|
|
requester_platform=token_data.platform,
|
|
requester_uid=token_data.id,
|
|
requester_display=token_data.display,
|
|
))
|
|
queue_write(queue)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_201_CREATED,
|
|
detail="done_add",
|
|
)
|
|
|
|
|
|
@router.post('/queue/delete', summary='Remove from Queue', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
|
|
def queue_del(beatmap: int, token: str = Depends(reuseable_oauth)):
|
|
token_with_permissions = parse_token_with_permissions(token)
|
|
token_data: TokenPayloadModel = token_with_permissions.user
|
|
is_streamer = token_with_permissions.permissions.is_admin
|
|
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
|
|
)
|
|
beatmap_: Optional[DifficultyWrapperForOsu] = next(
|
|
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
|
|
crossgame_beatmaps), None)
|
|
if beatmap_ is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
queue = queue_read()
|
|
item: Optional[QueueItem] = next(filter(
|
|
lambda a: a.id == beatmap_.id, # type: ignore
|
|
queue), None)
|
|
if item is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
by_user = item.requester_platform == token_data.platform and item.requester_uid == token_data.id
|
|
if not by_user and not is_streamer:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="cannot_remove_others",
|
|
)
|
|
if not is_streamer and item.played:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="cannot_remove_played",
|
|
)
|
|
queue.remove(item)
|
|
queue_write(queue)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_202_ACCEPTED,
|
|
detail="done_delete",
|
|
)
|
|
|
|
|
|
@router.post('/queue/unplay', summary='Mark to replay', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
|
|
def queue_unplay(beatmap: int, token: str = Depends(reuseable_oauth)):
|
|
token_with_permissions = parse_token_with_permissions(token)
|
|
is_streamer = token_with_permissions.permissions.is_admin
|
|
if not is_streamer:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="only_streamer_can_mark_played",
|
|
)
|
|
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
|
|
)
|
|
beatmap_: Optional[DifficultyWrapperForOsu] = next(
|
|
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
|
|
crossgame_beatmaps), None)
|
|
if beatmap_ is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
queue = queue_read()
|
|
item: Optional[QueueItem] = next(filter(
|
|
lambda a: a.id == beatmap_.id, # type: ignore
|
|
queue), None)
|
|
if item is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
item.played = False
|
|
queue_write(queue)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_202_ACCEPTED,
|
|
detail="done_mark_unplayed",
|
|
)
|
|
|
|
|
|
@router.post('/queue/play', summary='Mark as played', response_model=DetailModel, status_code=202, responses={404: {}, 401: {}})
|
|
def queue_play(beatmap: int, token: str = Depends(reuseable_oauth)):
|
|
token_with_permissions = parse_token_with_permissions(token)
|
|
is_streamer = token_with_permissions.permissions.is_admin
|
|
if not is_streamer:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="only_streamer_can_mark_played",
|
|
)
|
|
crossgame_beatmaps: List[DifficultyWrapperForRhythmGame] = get_crossgame_beatmaps(
|
|
)
|
|
beatmap_: Optional[DifficultyWrapperForOsu] = next(
|
|
filter(lambda a: a is not None and a.id == beatmap, # type: ignore
|
|
crossgame_beatmaps), None)
|
|
if beatmap_ is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
queue = queue_read()
|
|
item: Optional[QueueItem] = next(filter(
|
|
lambda a: a.id == beatmap_.id, # type: ignore
|
|
queue), None)
|
|
if item is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="not_found",
|
|
)
|
|
item.played = True
|
|
queue_write(queue)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_202_ACCEPTED,
|
|
detail="done_mark_played",
|
|
)
|