furmeet-bot/webproj/bot/views.py

537 lines
20 KiB
Python

# -*- encoding: utf-8 -*-
import json
import urllib.parse
from pathlib import Path
from django.conf import settings
from django.core.exceptions import ValidationError
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404, redirect, resolve_url
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import ContextMixin, TemplateResponseMixin
from telegram.bot import Bot
from . import forms, models
from .telebot import get_bot_self, get_bot_self_uncached, run_telebot
# Create your views here.
class TemplateView(TemplateResponseMixin, ContextMixin, View):
def get(self, request, *_, **kwargs):
context = self.get_context_data(**kwargs)
response = self.render_to_response(context)
return response
class PingPongView(View):
def get(self, request):
with Path('.').joinpath(settings.STATIC_URL.strip('/')).joinpath('ping.txt').open() as f:
return HttpResponse(f.read(), content_type='text/plain')
class BorkedView(View):
def get(self, request):
raise Exception('Borked view was invoked')
class RequireTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
telegram_user = models.TelegramUser.objects.filter(
uuid=telegram_user_hash).first()
if telegram_user_hash is None or telegram_user is None:
return redirect('home')
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
class UseTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
telegram_user = models.TelegramUser.objects.filter(
uuid=telegram_user_hash).first()
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
@method_decorator(UseTelegramLogin, 'dispatch')
class SoonView(TemplateView):
template_name = "soon.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class HomeView(TemplateView):
template_name = "index.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class TermsView(TemplateView):
template_name = "terms.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class PrivacyView(TemplateView):
template_name = "privacy.haml"
@method_decorator(UseTelegramLogin, 'dispatch')
class PythonLicensesView(TemplateView):
template_name = "python_licenses.haml"
def setupWebHook(token):
bot_webhook_url = f'https://{settings.FIRST_DOMAIN}/telegram/{token}/'
Bot(token).set_webhook(url=bot_webhook_url)
class WebHookSetupView(View):
def get(self, request):
setupWebHook(settings.TELEGRAM_TOKEN)
return HttpResponse()
@method_decorator(csrf_exempt, 'dispatch')
class WebHookView(View):
def post(self, request, telegram):
update = json.loads(request.body)
run_telebot(telegram, update, request)
print(json.dumps(update, indent=4))
return HttpResponse()
class LoginView(View):
def get(self, request, tuid, hsh):
lh = models.LoginHash.objects.filter(id=tuid).first()
if lh is None or str(lh.uuid) != hsh:
hr = HttpResponse('Access Denied')
hr.status_code = 401
return hr
response = redirect('control_panel')
response.set_cookie(
'user_hash', f'{lh.telegram_user.uuid}', expires=4*7*24*3600)
lh.delete()
return response
class LogoutView(View):
def get(self, request):
response = redirect('control_panel')
response.delete_cookie('user_hash')
return response
@method_decorator(RequireTelegramLogin, 'dispatch')
class ControlPanelView(TemplateView):
template_name = 'control_panel.haml'
@method_decorator(RequireTelegramLogin, 'dispatch')
class AddTelegramBotView(View):
def post(self, request, tg_user):
token = request.POST.get('token', '')
bot_self = get_bot_self(token=token)
userBot = models.TelegramUserBots.objects.filter(token=token).first()
if userBot is None:
userBot = models.TelegramUserBots()
userBot.owner = tg_user
userBot.telegram_id = bot_self
userBot.token = token
userBot.save()
get_bot_self_uncached(token=token)
setupWebHook(token)
return redirect('control_panel')
@method_decorator(RequireTelegramLogin, 'dispatch')
class DeleteAccountView(View):
def post(self, request, tg_user):
confirmation = request.POST.get('confirmation', '')
if confirmation == 'Yes, delete my account and all its data':
tg_user.delete()
return redirect('logout')
else:
return HttpResponse('Strings do not match:<br>\n' +
f'{confirmation}<br>\n' +
'Yes, delete my account and all its data<br>\n' +
'<br>\n' +
'Nothing was done.')
@method_decorator(RequireTelegramLogin, 'dispatch')
class DeleteBot(TemplateView):
template_name = 'delete_item.haml'
def get_context_data(self, **kwargs):
kwargs['display_template'] = 'delete_item_bot.haml'
kwargs['previous'] = resolve_url('control_panel')
kwargs['item'] = self._findFromContext(**kwargs)
return super().get_context_data(**kwargs)
def _findFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].bots, id=kwargs['bot_id'])
def post(self, request, **kwargs):
self._findFromContext(**kwargs).delete()
return redirect('control_panel')
@ method_decorator(RequireTelegramLogin, 'dispatch')
class ManageGroupGenericView(TemplateView):
def get(self, request, *args, **kwargs):
return super().get(request, *args, **{'request_': request, **kwargs})
def get_context_data(self, **kwargs):
kwargs['group_preferences'] = self._findGroupPreferencesFromContext(
**kwargs)
kwargs['group'] = self._findGroupFromContext(**kwargs)
kwargs = self._putForms(**kwargs)
return super().get_context_data(**kwargs)
def _findGroupFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].manages, id=kwargs['group_id'])
def _findGroupPreferencesFromContext(self, **kwargs):
group = self._findGroupFromContext(**kwargs)
if getattr(group, 'preferences', None) is None:
groupPreferences = models.GroupPreferences()
groupPreferences.group = group
groupPreferences.save()
group = self._findGroupFromContext(**kwargs)
return group.preferences
def _putForms(self, **kwargs):
request = kwargs['request_']
if request.GET.get('form_validation_failed', 'n') == 'y':
form = kwargs['form']
form = self._restrain_form(
form.__class__(request.GET, instance=form.instance),
**{k: v for k, v in kwargs.items() if k != 'form'})
kwargs['form'] = form
form.full_clean()
return kwargs
def _restrain_form(self, form, **_):
return form
def post(self, request, **kwargs):
kwargs = self.get_context_data(**{**kwargs, 'request_': request})
if (form := kwargs.get('form', None)) is not None:
if request.POST.get('do', '') != 'delete':
form = self._restrain_form(form.__class__(request.POST, instance=form.instance),
**{k: v for k, v in kwargs.items() if k != 'form'})
try:
form.full_clean()
form.save()
except (ValueError, ValidationError):
postcopy = request.POST.copy()
postcopy['csrfmiddlewaretoken'] = ''
postcopy['do'] = ''
del postcopy['csrfmiddlewaretoken']
del postcopy['do']
return HttpResponseRedirect(f'{request.path}?form_validation_failed=y&{urllib.parse.urlencode(postcopy)}')
else:
form.instance.delete()
return HttpResponseRedirect(kwargs.get('redirect_on_submit', None) or request.path)
class ManageGroupView(ManageGroupGenericView):
template_name = 'manage_group.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('control_panel')
return super().get_context_data(**kwargs)
class ManageGroupAntiSpamView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Anti-Spam'
kwargs['form'] = forms.AntiSpam(instance=kwargs['group_preferences'])
return super()._putForms(**kwargs)
class ManageGroupCAPTCHAView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'CAPTCHA'
kwargs['form'] = forms.CAPTCHA(instance=kwargs['group_preferences'])
return super()._putForms(**kwargs)
class ManageGroupJoinLeaveGreetingsView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Join/Leave Greetings'
kwargs['form'] = forms.Greetings(instance=kwargs['group_preferences'])
return super()._putForms(**kwargs)
class ManageGroupCannedMessageView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_cannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['canned_message'] = (
kwargs['group_preferences'].canned_messages.filter(
id=kwargs['cannedmessage_id']).first()
or
models.GroupCannedMessage(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.CannedMessage(instance=kwargs['canned_message'])
kwargs['deletion_allowed'] = not kwargs['canned_message']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupCannedMessagesView(ManageGroupGenericView):
template_name = 'manage_group_cannedmessages.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
class ManageGroupPlannedMessagesView(ManageGroupGenericView):
template_name = 'manage_group_plannedmessages.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
class ManageGroupPlannedMessagesTimezoneView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Planned Messages\' Timezone'
kwargs['form'] = forms.PlannedMessageTimezone(
instance=kwargs['group_preferences'])
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedMessageView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['planned_message'] = (
kwargs['group_preferences'].planned_messages.filter(
id=kwargs['plannedmessage_id']).first()
or
models.GroupPlannedMessage(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.PlannedMessage(
instance=kwargs['planned_message'])
kwargs['deletion_allowed'] = not kwargs['planned_message']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedDispatchView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['planned_dispatch'] = (
kwargs['group_preferences'].planned_message_dispatches.filter(
id=kwargs['planneddispatch_id']).first()
or
models.GroupPlannedDispatch(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.PlannedDispatch(
instance=kwargs['planned_dispatch'])
kwargs['deletion_allowed'] = not kwargs['planned_dispatch']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedEnablednessView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Planned Messages Configuration'
kwargs['bot_choices'] = self.generate_bot_choices(**kwargs)
kwargs['form'] = self._restrain_form(
forms.PlannedMessageConfiguration(
instance=kwargs['group_preferences']),
**{k: v for k, v in kwargs.items() if k != 'form'})
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
def generate_bot_choices(self, **kwargs):
bot_choices = [
(None, f'@{settings.BOT_NAME} (the shared bot)'),
]
bots_considered = set([None])
for bot in ([kwargs['group_preferences'].planned_message_issuer] + list(kwargs['tg_user'].bots.all())):
if bot not in bots_considered:
bot_choices.append(
(bot.id, f'@{bot.telegram_username} ({bot.telegram_id}) by {bot.owner.name}'))
bots_considered.add(bot)
return bot_choices
def post(self, request, **kwargs):
kwargs = self.get_context_data(**{**kwargs, 'request_': request})
bot_choices = kwargs['bot_choices']
result = super().post(request, **kwargs)
group_preferences = models.GroupPreferences.objects.get(
id=kwargs['group_preferences'].id)
if group_preferences.planned_message_issuer is not None:
disallowed_choice = True
for bot_id, _ in bot_choices:
if bot_id == group_preferences.planned_message_issuer.id:
disallowed_choice = False
break
if disallowed_choice:
group_preferences.planned_message_issuer = None
group_preferences.save()
return result
def _restrain_form(self, form, **kwargs):
form.fields['planned_message_issuer'].choices = kwargs['bot_choices']
return super()._restrain_form(form, **kwargs)
class ManageAccessControlView(ManageGroupGenericView):
template_name = 'manage_group_accesscontrol.haml'
class ManageAccessControlUseView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_accesscontrol', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Access Control Policy'
kwargs['form'] = forms.AccessControlUse(
instance=kwargs['group_preferences'])
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageAccessControlGrantView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
MODEL = models.GroupAccessControlGrant
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url(
'manage_group_accesscontrol', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Access Control Grant'
if 'accesscontrolgrant_id' not in kwargs:
kwargs['accesscontrolgrant_id'] = 0
kwargs['access_control_grant'] = (
kwargs['group_preferences'].access_control_grants.filter(
id=kwargs['accesscontrolgrant_id']).first()
or
models.GroupAccessControlGrant(
group_preferences=kwargs['group_preferences'])
)
kwargs['admin_choices'] = self.generate_admin_choices(**kwargs)
kwargs['form'] = self._restrain_form(
forms.AccessControlGrant(instance=kwargs['access_control_grant']),
**{k: v for k, v in kwargs.items() if k != 'form'})
kwargs['deletion_allowed'] = not kwargs['access_control_grant']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
def generate_admin_choices(self, **kwargs):
admin_choices = []
group = kwargs['group']
group_preferences = kwargs['group_preferences']
for admin in group.admins.all():
if admin.as_bot is None:
if models.GroupAccessControlGrant.objects.filter(group_preferences=group_preferences, allows=admin).first() is None:
admin_choices.append(
(admin.id, f'{admin.name} | @{admin.telegram_username or "--unset--"} ({admin.telegram_id})'))
return admin_choices
def _restrain_form(self, form, **kwargs):
form.fields['allows'].choices = kwargs['admin_choices']
return super()._restrain_form(form, **kwargs)
class ManageAccessControlRevokeView(ManageAccessControlGrantView):
template_name = 'manage_group_formdelete.haml'
def generate_admin_choices(self, **kwargs):
admin_choices = []
acg = kwargs['access_control_grant']
if not acg._state.adding:
admin = acg.allows
admin_choices.append(
(admin.id, f'{admin.name} | @{admin.telegram_username or "--unset--"} ({admin.telegram_id})'))
else:
raise Http404
return admin_choices
def _putForms(self, **kwargs):
new_kwargs = super()._putForms(**kwargs)
new_kwargs['page_name'] = 'Access Control Revoke'
return new_kwargs
def _restrain_form(self, form, **kwargs):
new_form = super()._restrain_form(form, **kwargs)
for field in form.fields.items():
field[1].widget.attrs['readonly'] = True
field[1].widget.attrs['disabled'] = True
return new_form