furmeet-bot/webproj/bot/views.py

427 lines
17 KiB
Python

import datetime
import gzip
import json
import mimetypes
import random
import sys
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List
from uuid import UUID
import pytz
from captcha.image import ImageCaptcha
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import FileResponse, HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404, redirect, render, resolve_url
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.html import escape
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import last_modified
from django.views.generic import TemplateView
from telegram.bot import Bot
from ..mwt import MWT
from . import models
from . import forms
imageCaptchaFonts = [
str(Path('webproj/fonts/Roboto/Roboto-Regular.ttf').absolute()),
str(Path('webproj/fonts/Fondamento/Fondamento-Regular.ttf').absolute()),
str(Path('webproj/fonts/Josefin_Sans/static/JosefinSans-Medium.ttf').absolute()),
str(Path('webproj/fonts/Lora/static/Lora-Medium.ttf').absolute()),
str(Path('webproj/fonts/Quicksand/static/Quicksand-Regular.ttf').absolute()),
str(Path('webproj/fonts/Work_Sans/static/WorkSans-Medium.ttf').absolute()),
]
imageCaptcha = ImageCaptcha(300, 150, fonts=imageCaptchaFonts, font_sizes=[36, 40, 44, 48, 54, 60, 66, 72])
for truefont in imageCaptcha.truefonts:
pass
# Create your views here.
class RequireTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
telegram_user = models.TelegramUser.objects.filter(uuid=telegram_user_hash).first()
if telegram_user_hash is None or telegram_user is None:
return redirect('home')
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
class UseTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
telegram_user = models.TelegramUser.objects.filter(uuid=telegram_user_hash).first()
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
@method_decorator(UseTelegramLogin, 'dispatch')
class SoonView(TemplateView):
template_name = "soon.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class HomeView(TemplateView):
template_name = "index.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class TermsView(TemplateView):
template_name = "terms.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class PrivacyView(TemplateView):
template_name = "privacy.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class PythonLicensesView(TemplateView):
template_name = "python_licenses.haml"
def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser:
telegramUser = models.TelegramUser.objects.filter(telegram_id=obj['id']).first()
if telegramUser is None:
telegramUser = models.TelegramUser()
telegramUser.telegram_id = obj['id']
telegramUser.telegram_first_name = obj['first_name']
telegramUser.telegram_last_name = obj.get('last_name', None)
telegramUser.telegram_username = obj.get('username', None)
telegramUser.save()
return telegramUser
def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup:
telegramGroup = models.TelegramGroup.objects.filter(telegram_id=obj['id']).first()
if telegramGroup is None:
telegramGroup = models.TelegramGroup()
telegramGroup.telegram_id = obj['id']
telegramGroup.telegram_title = obj['title']
telegramGroup.save()
return telegramGroup
LAST_BOT: Bot = None
def get_bot_self_uncached(token: str) -> int:
tg_me = Bot(token).get_me()
getTelegramUser(tg_me.__dict__)
tg_bt = models.TelegramUserBots.objects.filter(token=token).first()
if tg_bt is not None:
tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip()
tg_bt.telegram_username = tg_me.username
tg_bt.telegram_id = tg_me.id
tg_bt.save()
return tg_me.id
get_bot_self = MWT(timeout=60)(get_bot_self_uncached)
@MWT(timeout=60)
def get_admins(chat_id: int) -> List[int]:
return [getTelegramUser(admin.user.__dict__).telegram_id
for admin in LAST_BOT.get_chat_administrators(chat_id)]
def setupWebHook(token):
bot_webhook_url = f'https://{settings.FIRST_DOMAIN}/telegram/{token}/'
Bot(token).set_webhook(url=bot_webhook_url)
class WebHookSetupView(View):
def get(self, request):
setupWebHook(settings.TELEGRAM_TOKEN)
return HttpResponse()
@method_decorator(csrf_exempt, 'dispatch')
class WebHookView(View):
def post(self, request, telegram):
global LAST_BOT
update = json.loads(request.body)
chatIsFromBot: bool = update.get('message', {}).get('from', {}).get('is_bot', True)
chatMessage: str = update.get('message', {}).get('text', '')
chatType: str = update.get('message', {}).get('chat', {}).get('type', '')
chatId: int = update.get('message', {}).get('chat', {}).get('id', 0)
fromId: int = update.get('message', {}).get('from', {}).get('id', 0)
newChatMembers: List[Dict[str, Any]] = update.get('message', {}).get('new_chat_members', [])
if not chatIsFromBot:
registeredBot = models.TelegramUserBots.objects.filter(token=telegram).first()
if registeredBot is None and telegram != settings.TELEGRAM_TOKEN:
Bot(telegram).set_webhook(url='')
elif chatType == 'channel':
Bot(telegram).leave_chat(chat_id=chatId)
else:
myId = get_bot_self(telegram)
botUser = models.TelegramUser.objects.get(telegram_id=myId)
bot = Bot(telegram)
telegramUser = getTelegramUser(update.get('message', {}).get('from', {}))
if chatType == 'private':
if registeredBot is None:
if chatMessage == '/login':
loginHash = models.LoginHash(telegram_user=telegramUser)
loginHash.save()
bot.send_message(
chat_id=chatId,
text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}',
disable_web_page_preview=True
)
elif chatMessage in ('/start', '/help'):
bot.send_message(
chat_id=chatId,
text='I am a bot that is able to provide the following services to your group:\n' +
' - Anti-Spam through CAS (https://cas.chat)\n' +
' - Anti-Spam through CAPTCHA\n' +
' - Join/Leave messages\n' +
' - Canned responses\n' +
'\n' +
f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' +
' - Here, you can only /login or ask me for /help.\n'
'\n' +
'You can run your custom-branded bot for free*.\n' +
' * might change in the future' +
''
)
elif chatMessage == '/previewCAPTCHA':
secret = ''
while len(secret) < 7:
secret += str(random.randint(0, 9))
imbytes = imageCaptcha.generate(secret)
bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=f'Sample CAPTCHA for secret "{secret}"'
)
else:
bot.send_message(
chat_id=chatId,
text=f'Sorry, I could not understand you.\nTry /help.'
)
else:
bot.send_message(
chat_id=chatId,
text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.',
)
elif chatType in ('group', 'supergroup'):
LAST_BOT = bot
chatAdminsTelegramIds = get_admins(chatId)
LAST_BOT = None
telegramGroup = getTelegramGroup(update.get('message', {}).get('chat', {}))
telegramGroup.admins.set(models.TelegramUser.objects.filter(telegram_id__in=chatAdminsTelegramIds).all())
telegramGroup.save()
print(chatAdminsTelegramIds)
print(myId in chatAdminsTelegramIds)
leftMemberUpdate = update.get('message', {}).get('left_chat_member', None)
if leftMemberUpdate is not None:
telegramMember = getTelegramUser(leftMemberUpdate)
for newChatMember in newChatMembers:
telegramMember = getTelegramUser(newChatMember)
print(json.dumps(update, indent=4))
return HttpResponse()
class LoginView(View):
def get(self, request, tuid, hsh):
lh = models.LoginHash.objects.filter(id=tuid).first()
if lh is None or str(lh.uuid) != hsh:
hr = HttpResponse('Access Denied')
hr.status_code = 401
return hr
response = redirect('control_panel')
response.set_cookie('user_hash', f'{lh.telegram_user.uuid}', expires=4*7*24*3600)
lh.delete()
return response
class LogoutView(View):
def get(self, request):
response = redirect('control_panel')
response.delete_cookie('user_hash')
return response
@method_decorator(RequireTelegramLogin, 'dispatch')
class ControlPanelView(TemplateView):
template_name = 'control_panel.haml'
@method_decorator(RequireTelegramLogin, 'dispatch')
class AddTelegramBotView(View):
def post(self, request, tg_user):
token = request.POST.get('token', '')
bot_self = get_bot_self(token=token)
userBot = models.TelegramUserBots.objects.filter(token=token).first()
if userBot is None:
userBot = models.TelegramUserBots()
userBot.owner = tg_user
userBot.telegram_id = bot_self
userBot.token = token
userBot.save()
get_bot_self_uncached(token=token)
setupWebHook(token)
return redirect('control_panel')
@method_decorator(RequireTelegramLogin, 'dispatch')
class DeleteAccountView(View):
def post(self, request, tg_user):
confirmation = request.POST.get('confirmation', '')
if confirmation == 'Yes, delete my account and all its data':
tg_user.delete()
return redirect('logout')
else:
return HttpResponse('Strings do not match:<br>\n' +
f'{confirmation}<br>\n' +
'Yes, delete my account and all its data<br>\n' +
'<br>\n' +
'Nothing was done.')
@method_decorator(RequireTelegramLogin, 'dispatch')
class DeleteBot(TemplateView):
template_name = 'delete_item.haml'
def get_context_data(self, **kwargs):
kwargs['display_template'] = 'delete_item_bot.haml'
kwargs['previous'] = resolve_url('control_panel')
kwargs['item'] = self._findFromContext(**kwargs)
return super().get_context_data(**kwargs)
def _findFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].bots, id=kwargs['bot_id'])
def post(self, request, **kwargs):
self._findFromContext(**kwargs).delete()
return redirect('control_panel')
class ManageGroupGenericView(TemplateView):
def get_context_data(self, **kwargs):
kwargs['group_preferences'] = self._findGroupPreferencesFromContext(**kwargs)
kwargs['group'] = self._findGroupFromContext(**kwargs)
kwargs = self._putForms(**kwargs)
return super().get_context_data(**kwargs)
def _findGroupFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].admins, id=kwargs['group_id'])
def _findGroupPreferencesFromContext(self, **kwargs):
group = self._findGroupFromContext(**kwargs)
if getattr(group, 'preferences', None) is None:
groupPreferences = models.GroupPreferences()
groupPreferences.group = group
groupPreferences.save()
group = self._findGroupFromContext(**kwargs)
return group.preferences
def _putForms(self, **kwargs):
return kwargs
def post(self, request, **kwargs):
kwargs = self.get_context_data(**kwargs)
if (form := kwargs.get('form', None)) is not None:
if request.POST.get('do', '') != 'delete':
form = form.__class__(request.POST, instance=form.instance)
form.save()
else:
form.instance.delete()
return HttpResponseRedirect(kwargs.get('redirect_on_submit', None) or request.path)
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupView(ManageGroupGenericView):
template_name = 'manage_group.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('control_panel')
return super().get_context_data(**kwargs)
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupAntiSpamView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Anti-Spam'
kwargs['form'] = forms.AntiSpam(instance=kwargs['group_preferences'])
return kwargs
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupCAPTCHAView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'CAPTCHA'
kwargs['form'] = forms.CAPTCHA(instance=kwargs['group_preferences'])
return kwargs
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupJoinLeaveGreetingsView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Join/Leave Greetings'
kwargs['form'] = forms.Greetings(instance=kwargs['group_preferences'])
return kwargs
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupCannedMessageView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group_cannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['canned_message'] = (
kwargs['group_preferences'].canned_messages.filter(id=kwargs['cannedmessage_id']).first()
or
models.GroupCannedMessage(group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.CannedMessage(instance=kwargs['canned_message'])
kwargs['deletion_allowed'] = not kwargs['canned_message']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return kwargs
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupCannedMessagesView(ManageGroupGenericView):
template_name = 'manage_group_cannedmessages.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)