site_liveq/liveq/auth.py

74 lines
2.4 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import datetime
import urllib.parse
import uuid
from constants import ALGORITHM, JWT_SECRET_KEY, TWITCH_CLIENT_ID, config
from fastapi import HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt
from models import (TokenPayloadModel, TokenUserWithPermissionsModel,
UserPermissionsModel)
from pydantic import ValidationError
def generate_twitch_login_link():
return ("https://id.twitch.tv/oauth2/authorize" +
"?response_type=token" +
"&force_verify=false" +
f"&client_id={TWITCH_CLIENT_ID}" +
f"&redirect_uri={urllib.parse.quote(config['app']['root']+'/callback/twitch')}" +
"&scope=" +
f"&state={uuid.uuid4().hex}" +
'')
reuseable_oauth = OAuth2AuthorizationCodeBearer(
authorizationUrl=generate_twitch_login_link(),
tokenUrl='/login/twitch',
scheme_name="JWT",
)
def parse_token(token: str) -> 'TokenPayloadModel':
try:
payload = jwt.decode(
token, JWT_SECRET_KEY, algorithms=[ALGORITHM]
)
token_data: TokenPayloadModel = TokenPayloadModel(**payload)
if datetime.datetime.fromisoformat(token_data.expiration) < datetime.datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="token_expired",
headers={"WWW-Authenticate": "Bearer"},
)
return token_data
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="could_not_validate_credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def user_permissions(token_data: 'TokenPayloadModel') -> 'UserPermissionsModel':
is_streamer = config['app']['streamer'].strip() in [
f'{token_data.platform}:{token_data.id}',
f'{token_data.platform}:{token_data.login}',
]
return UserPermissionsModel(is_admin=is_streamer)
def user_with_permissions(token_data: 'TokenPayloadModel') -> 'TokenUserWithPermissionsModel':
permissions = user_permissions(token_data)
return TokenUserWithPermissionsModel(
user=token_data,
permissions=permissions,
)
def parse_token_with_permissions(token: str) -> 'TokenUserWithPermissionsModel':
return user_with_permissions(parse_token(token))