74 lines
2.4 KiB
Python
74 lines
2.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import datetime
|
|
import urllib.parse
|
|
import uuid
|
|
|
|
from constants import ALGORITHM, JWT_SECRET_KEY, TWITCH_CLIENT_ID, config
|
|
from fastapi import HTTPException, status
|
|
from fastapi.security import OAuth2AuthorizationCodeBearer
|
|
from jose import jwt
|
|
from models import (TokenPayloadModel, TokenUserWithPermissionsModel,
|
|
UserPermissionsModel)
|
|
from pydantic import ValidationError
|
|
|
|
|
|
def generate_twitch_login_link():
|
|
return ("https://id.twitch.tv/oauth2/authorize" +
|
|
"?response_type=token" +
|
|
"&force_verify=false" +
|
|
f"&client_id={TWITCH_CLIENT_ID}" +
|
|
f"&redirect_uri={urllib.parse.quote(config['app']['root']+'/callback/twitch')}" +
|
|
"&scope=" +
|
|
f"&state={uuid.uuid4().hex}" +
|
|
'')
|
|
|
|
|
|
reuseable_oauth = OAuth2AuthorizationCodeBearer(
|
|
authorizationUrl=generate_twitch_login_link(),
|
|
tokenUrl='/login/twitch',
|
|
scheme_name="JWT",
|
|
)
|
|
|
|
|
|
def parse_token(token: str) -> 'TokenPayloadModel':
|
|
try:
|
|
payload = jwt.decode(
|
|
token, JWT_SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
token_data: TokenPayloadModel = TokenPayloadModel(**payload)
|
|
if datetime.datetime.fromisoformat(token_data.expiration) < datetime.datetime.now():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="token_expired",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return token_data
|
|
except (jwt.JWTError, ValidationError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="could_not_validate_credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
def user_permissions(token_data: 'TokenPayloadModel') -> 'UserPermissionsModel':
|
|
is_streamer = config['app']['streamer'].strip() in [
|
|
f'{token_data.platform}:{token_data.id}',
|
|
f'{token_data.platform}:{token_data.login}',
|
|
]
|
|
return UserPermissionsModel(is_admin=is_streamer)
|
|
|
|
|
|
def user_with_permissions(token_data: 'TokenPayloadModel') -> 'TokenUserWithPermissionsModel':
|
|
permissions = user_permissions(token_data)
|
|
return TokenUserWithPermissionsModel(
|
|
user=token_data,
|
|
permissions=permissions,
|
|
)
|
|
|
|
|
|
def parse_token_with_permissions(token: str) -> 'TokenUserWithPermissionsModel':
|
|
return user_with_permissions(parse_token(token))
|