#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import datetime import urllib.parse import uuid from constants import ALGORITHM, JWT_SECRET_KEY, TWITCH_CLIENT_ID, config from fastapi import HTTPException, status from fastapi.security import OAuth2AuthorizationCodeBearer from jose import jwt from models import (TokenPayloadModel, TokenUserWithPermissionsModel, UserPermissionsModel) from pydantic import ValidationError def generate_twitch_login_link(): return ("https://id.twitch.tv/oauth2/authorize" + "?response_type=token" + "&force_verify=false" + f"&client_id={TWITCH_CLIENT_ID}" + f"&redirect_uri={urllib.parse.quote(config['app']['root']+'/callback/twitch')}" + "&scope=" + f"&state={uuid.uuid4().hex}" + '') reuseable_oauth = OAuth2AuthorizationCodeBearer( authorizationUrl=generate_twitch_login_link(), tokenUrl='/login/twitch', scheme_name="JWT", ) def parse_token(token: str) -> 'TokenPayloadModel': try: payload = jwt.decode( token, JWT_SECRET_KEY, algorithms=[ALGORITHM] ) token_data: TokenPayloadModel = TokenPayloadModel(**payload) if datetime.datetime.fromisoformat(token_data.expiration) < datetime.datetime.now(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="token_expired", headers={"WWW-Authenticate": "Bearer"}, ) return token_data except (jwt.JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="could_not_validate_credentials", headers={"WWW-Authenticate": "Bearer"}, ) def user_permissions(token_data: 'TokenPayloadModel') -> 'UserPermissionsModel': is_streamer = config['app']['streamer'].strip() in [ f'{token_data.platform}:{token_data.id}', f'{token_data.platform}:{token_data.login}', ] return UserPermissionsModel(is_admin=is_streamer) def user_with_permissions(token_data: 'TokenPayloadModel') -> 'TokenUserWithPermissionsModel': permissions = user_permissions(token_data) return TokenUserWithPermissionsModel( user=token_data, permissions=permissions, ) def parse_token_with_permissions(token: str) -> 'TokenUserWithPermissionsModel': return user_with_permissions(parse_token(token))