537 lines
20 KiB
Python
537 lines
20 KiB
Python
# -*- encoding: utf-8 -*-
|
|
import json
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.http import Http404, HttpResponse, HttpResponseRedirect
|
|
from django.shortcuts import get_object_or_404, redirect, resolve_url
|
|
from django.utils.decorators import method_decorator
|
|
from django.views import View
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.generic.base import ContextMixin, TemplateResponseMixin
|
|
from telegram.bot import Bot
|
|
|
|
from . import forms, models
|
|
from .telebot import get_bot_self, get_bot_self_uncached, run_telebot
|
|
|
|
# Create your views here.
|
|
|
|
|
|
class TemplateView(TemplateResponseMixin, ContextMixin, View):
|
|
def get(self, request, *_, **kwargs):
|
|
context = self.get_context_data(**kwargs)
|
|
response = self.render_to_response(context)
|
|
return response
|
|
|
|
|
|
class PingPongView(View):
|
|
def get(self, request):
|
|
with Path('.').joinpath(settings.STATIC_URL.strip('/')).joinpath('ping.txt').open() as f:
|
|
return HttpResponse(f.read(), content_type='text/plain')
|
|
|
|
|
|
class BorkedView(View):
|
|
def get(self, request):
|
|
raise Exception('Borked view was invoked')
|
|
|
|
|
|
class RequireTelegramLogin:
|
|
def __init__(self, func):
|
|
self.func = func
|
|
|
|
def __call__(self, request, *args, **kwargs):
|
|
telegram_user_hash = request.COOKIES.get('user_hash', None)
|
|
telegram_user = models.TelegramUser.objects.filter(
|
|
uuid=telegram_user_hash).first()
|
|
if telegram_user_hash is None or telegram_user is None:
|
|
return redirect('home')
|
|
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
|
|
|
|
|
|
class UseTelegramLogin:
|
|
def __init__(self, func):
|
|
self.func = func
|
|
|
|
def __call__(self, request, *args, **kwargs):
|
|
telegram_user_hash = request.COOKIES.get('user_hash', None)
|
|
telegram_user = models.TelegramUser.objects.filter(
|
|
uuid=telegram_user_hash).first()
|
|
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
|
|
|
|
|
|
@method_decorator(UseTelegramLogin, 'dispatch')
|
|
class SoonView(TemplateView):
|
|
template_name = "soon.haml"
|
|
|
|
|
|
@method_decorator(UseTelegramLogin, 'dispatch')
|
|
class HomeView(TemplateView):
|
|
template_name = "index.haml"
|
|
|
|
|
|
@method_decorator(UseTelegramLogin, 'dispatch')
|
|
class TermsView(TemplateView):
|
|
template_name = "terms.haml"
|
|
|
|
|
|
@method_decorator(UseTelegramLogin, 'dispatch')
|
|
class PrivacyView(TemplateView):
|
|
template_name = "privacy.haml"
|
|
|
|
|
|
@method_decorator(UseTelegramLogin, 'dispatch')
|
|
class PythonLicensesView(TemplateView):
|
|
template_name = "python_licenses.haml"
|
|
|
|
|
|
def setupWebHook(token):
|
|
bot_webhook_url = f'https://{settings.FIRST_DOMAIN}/telegram/{token}/'
|
|
Bot(token).set_webhook(url=bot_webhook_url)
|
|
|
|
|
|
class WebHookSetupView(View):
|
|
def get(self, request):
|
|
setupWebHook(settings.TELEGRAM_TOKEN)
|
|
return HttpResponse()
|
|
|
|
|
|
@method_decorator(csrf_exempt, 'dispatch')
|
|
class WebHookView(View):
|
|
def post(self, request, telegram):
|
|
update = json.loads(request.body)
|
|
run_telebot(telegram, update, request)
|
|
print(json.dumps(update, indent=4))
|
|
return HttpResponse()
|
|
|
|
|
|
class LoginView(View):
|
|
def get(self, request, tuid, hsh):
|
|
lh = models.LoginHash.objects.filter(id=tuid).first()
|
|
if lh is None or str(lh.uuid) != hsh:
|
|
hr = HttpResponse('Access Denied')
|
|
hr.status_code = 401
|
|
return hr
|
|
response = redirect('control_panel')
|
|
response.set_cookie(
|
|
'user_hash', f'{lh.telegram_user.uuid}', expires=4*7*24*3600)
|
|
lh.delete()
|
|
return response
|
|
|
|
|
|
class LogoutView(View):
|
|
def get(self, request):
|
|
response = redirect('control_panel')
|
|
response.delete_cookie('user_hash')
|
|
return response
|
|
|
|
|
|
@method_decorator(RequireTelegramLogin, 'dispatch')
|
|
class ControlPanelView(TemplateView):
|
|
template_name = 'control_panel.haml'
|
|
|
|
|
|
@method_decorator(RequireTelegramLogin, 'dispatch')
|
|
class AddTelegramBotView(View):
|
|
def post(self, request, tg_user):
|
|
token = request.POST.get('token', '')
|
|
bot_self = get_bot_self(token=token)
|
|
userBot = models.TelegramUserBots.objects.filter(token=token).first()
|
|
if userBot is None:
|
|
userBot = models.TelegramUserBots()
|
|
userBot.owner = tg_user
|
|
userBot.telegram_id = bot_self
|
|
userBot.token = token
|
|
userBot.save()
|
|
get_bot_self_uncached(token=token)
|
|
setupWebHook(token)
|
|
return redirect('control_panel')
|
|
|
|
|
|
@method_decorator(RequireTelegramLogin, 'dispatch')
|
|
class DeleteAccountView(View):
|
|
def post(self, request, tg_user):
|
|
confirmation = request.POST.get('confirmation', '')
|
|
if confirmation == 'Yes, delete my account and all its data':
|
|
tg_user.delete()
|
|
return redirect('logout')
|
|
else:
|
|
return HttpResponse('Strings do not match:<br>\n' +
|
|
f'{confirmation}<br>\n' +
|
|
'Yes, delete my account and all its data<br>\n' +
|
|
'<br>\n' +
|
|
'Nothing was done.')
|
|
|
|
|
|
@method_decorator(RequireTelegramLogin, 'dispatch')
|
|
class DeleteBot(TemplateView):
|
|
template_name = 'delete_item.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['display_template'] = 'delete_item_bot.haml'
|
|
kwargs['previous'] = resolve_url('control_panel')
|
|
kwargs['item'] = self._findFromContext(**kwargs)
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _findFromContext(self, **kwargs):
|
|
return get_object_or_404(kwargs['tg_user'].bots, id=kwargs['bot_id'])
|
|
|
|
def post(self, request, **kwargs):
|
|
self._findFromContext(**kwargs).delete()
|
|
return redirect('control_panel')
|
|
|
|
|
|
@ method_decorator(RequireTelegramLogin, 'dispatch')
|
|
class ManageGroupGenericView(TemplateView):
|
|
def get(self, request, *args, **kwargs):
|
|
return super().get(request, *args, **{'request_': request, **kwargs})
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['group_preferences'] = self._findGroupPreferencesFromContext(
|
|
**kwargs)
|
|
kwargs['group'] = self._findGroupFromContext(**kwargs)
|
|
kwargs = self._putForms(**kwargs)
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _findGroupFromContext(self, **kwargs):
|
|
return get_object_or_404(kwargs['tg_user'].manages, id=kwargs['group_id'])
|
|
|
|
def _findGroupPreferencesFromContext(self, **kwargs):
|
|
group = self._findGroupFromContext(**kwargs)
|
|
if getattr(group, 'preferences', None) is None:
|
|
groupPreferences = models.GroupPreferences()
|
|
groupPreferences.group = group
|
|
groupPreferences.save()
|
|
group = self._findGroupFromContext(**kwargs)
|
|
return group.preferences
|
|
|
|
def _putForms(self, **kwargs):
|
|
request = kwargs['request_']
|
|
if request.GET.get('form_validation_failed', 'n') == 'y':
|
|
form = kwargs['form']
|
|
form = self._restrain_form(
|
|
form.__class__(request.GET, instance=form.instance),
|
|
**{k: v for k, v in kwargs.items() if k != 'form'})
|
|
kwargs['form'] = form
|
|
form.full_clean()
|
|
return kwargs
|
|
|
|
def _restrain_form(self, form, **_):
|
|
return form
|
|
|
|
def post(self, request, **kwargs):
|
|
kwargs = self.get_context_data(**{**kwargs, 'request_': request})
|
|
if (form := kwargs.get('form', None)) is not None:
|
|
if request.POST.get('do', '') != 'delete':
|
|
form = self._restrain_form(form.__class__(request.POST, instance=form.instance),
|
|
**{k: v for k, v in kwargs.items() if k != 'form'})
|
|
try:
|
|
form.full_clean()
|
|
form.save()
|
|
except (ValueError, ValidationError):
|
|
postcopy = request.POST.copy()
|
|
postcopy['csrfmiddlewaretoken'] = ''
|
|
postcopy['do'] = ''
|
|
del postcopy['csrfmiddlewaretoken']
|
|
del postcopy['do']
|
|
return HttpResponseRedirect(f'{request.path}?form_validation_failed=y&{urllib.parse.urlencode(postcopy)}')
|
|
else:
|
|
form.instance.delete()
|
|
return HttpResponseRedirect(kwargs.get('redirect_on_submit', None) or request.path)
|
|
|
|
|
|
class ManageGroupView(ManageGroupGenericView):
|
|
template_name = 'manage_group.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('control_panel')
|
|
return super().get_context_data(**kwargs)
|
|
|
|
|
|
class ManageGroupAntiSpamView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Anti-Spam'
|
|
kwargs['form'] = forms.AntiSpam(instance=kwargs['group_preferences'])
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupCAPTCHAView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'CAPTCHA'
|
|
kwargs['form'] = forms.CAPTCHA(instance=kwargs['group_preferences'])
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupJoinLeaveGreetingsView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Join/Leave Greetings'
|
|
kwargs['form'] = forms.Greetings(instance=kwargs['group_preferences'])
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupCannedMessageView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_cannedmessages', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Canned Message'
|
|
kwargs['canned_message'] = (
|
|
kwargs['group_preferences'].canned_messages.filter(
|
|
id=kwargs['cannedmessage_id']).first()
|
|
or
|
|
models.GroupCannedMessage(
|
|
group_preferences=kwargs['group_preferences'])
|
|
)
|
|
kwargs['form'] = forms.CannedMessage(instance=kwargs['canned_message'])
|
|
kwargs['deletion_allowed'] = not kwargs['canned_message']._state.adding
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupCannedMessagesView(ManageGroupGenericView):
|
|
template_name = 'manage_group_cannedmessages.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
|
|
class ManageGroupPlannedMessagesView(ManageGroupGenericView):
|
|
template_name = 'manage_group_plannedmessages.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
|
|
class ManageGroupPlannedMessagesTimezoneView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_plannedmessages', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Planned Messages\' Timezone'
|
|
kwargs['form'] = forms.PlannedMessageTimezone(
|
|
instance=kwargs['group_preferences'])
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupPlannedMessageView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_plannedmessages', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Canned Message'
|
|
kwargs['planned_message'] = (
|
|
kwargs['group_preferences'].planned_messages.filter(
|
|
id=kwargs['plannedmessage_id']).first()
|
|
or
|
|
models.GroupPlannedMessage(
|
|
group_preferences=kwargs['group_preferences'])
|
|
)
|
|
kwargs['form'] = forms.PlannedMessage(
|
|
instance=kwargs['planned_message'])
|
|
kwargs['deletion_allowed'] = not kwargs['planned_message']._state.adding
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupPlannedDispatchView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_plannedmessages', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Canned Message'
|
|
kwargs['planned_dispatch'] = (
|
|
kwargs['group_preferences'].planned_message_dispatches.filter(
|
|
id=kwargs['planneddispatch_id']).first()
|
|
or
|
|
models.GroupPlannedDispatch(
|
|
group_preferences=kwargs['group_preferences'])
|
|
)
|
|
kwargs['form'] = forms.PlannedDispatch(
|
|
instance=kwargs['planned_dispatch'])
|
|
kwargs['deletion_allowed'] = not kwargs['planned_dispatch']._state.adding
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageGroupPlannedEnablednessView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_plannedmessages', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Planned Messages Configuration'
|
|
kwargs['bot_choices'] = self.generate_bot_choices(**kwargs)
|
|
kwargs['form'] = self._restrain_form(
|
|
forms.PlannedMessageConfiguration(
|
|
instance=kwargs['group_preferences']),
|
|
**{k: v for k, v in kwargs.items() if k != 'form'})
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
def generate_bot_choices(self, **kwargs):
|
|
bot_choices = [
|
|
(None, f'@{settings.BOT_NAME} (the shared bot)'),
|
|
]
|
|
bots_considered = set([None])
|
|
for bot in ([kwargs['group_preferences'].planned_message_issuer] + list(kwargs['tg_user'].bots.all())):
|
|
if bot not in bots_considered:
|
|
bot_choices.append(
|
|
(bot.id, f'@{bot.telegram_username} ({bot.telegram_id}) by {bot.owner.name}'))
|
|
bots_considered.add(bot)
|
|
return bot_choices
|
|
|
|
def post(self, request, **kwargs):
|
|
kwargs = self.get_context_data(**{**kwargs, 'request_': request})
|
|
bot_choices = kwargs['bot_choices']
|
|
result = super().post(request, **kwargs)
|
|
group_preferences = models.GroupPreferences.objects.get(
|
|
id=kwargs['group_preferences'].id)
|
|
if group_preferences.planned_message_issuer is not None:
|
|
disallowed_choice = True
|
|
for bot_id, _ in bot_choices:
|
|
if bot_id == group_preferences.planned_message_issuer.id:
|
|
disallowed_choice = False
|
|
break
|
|
if disallowed_choice:
|
|
group_preferences.planned_message_issuer = None
|
|
group_preferences.save()
|
|
return result
|
|
|
|
def _restrain_form(self, form, **kwargs):
|
|
form.fields['planned_message_issuer'].choices = kwargs['bot_choices']
|
|
return super()._restrain_form(form, **kwargs)
|
|
|
|
|
|
class ManageAccessControlView(ManageGroupGenericView):
|
|
template_name = 'manage_group_accesscontrol.haml'
|
|
|
|
|
|
class ManageAccessControlUseView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_accesscontrol', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Access Control Policy'
|
|
kwargs['form'] = forms.AccessControlUse(
|
|
instance=kwargs['group_preferences'])
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
|
|
class ManageAccessControlGrantView(ManageGroupGenericView):
|
|
template_name = 'manage_group_form.haml'
|
|
MODEL = models.GroupAccessControlGrant
|
|
|
|
template_name = 'manage_group_form.haml'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
kwargs['previous'] = resolve_url(
|
|
'manage_group_accesscontrol', kwargs['group_id'])
|
|
return super().get_context_data(**kwargs)
|
|
|
|
def _putForms(self, **kwargs):
|
|
kwargs['page_name'] = 'Access Control Grant'
|
|
if 'accesscontrolgrant_id' not in kwargs:
|
|
kwargs['accesscontrolgrant_id'] = 0
|
|
kwargs['access_control_grant'] = (
|
|
kwargs['group_preferences'].access_control_grants.filter(
|
|
id=kwargs['accesscontrolgrant_id']).first()
|
|
or
|
|
models.GroupAccessControlGrant(
|
|
group_preferences=kwargs['group_preferences'])
|
|
)
|
|
kwargs['admin_choices'] = self.generate_admin_choices(**kwargs)
|
|
kwargs['form'] = self._restrain_form(
|
|
forms.AccessControlGrant(instance=kwargs['access_control_grant']),
|
|
**{k: v for k, v in kwargs.items() if k != 'form'})
|
|
kwargs['deletion_allowed'] = not kwargs['access_control_grant']._state.adding
|
|
kwargs['redirect_on_submit'] = kwargs['previous']
|
|
return super()._putForms(**kwargs)
|
|
|
|
def generate_admin_choices(self, **kwargs):
|
|
admin_choices = []
|
|
group = kwargs['group']
|
|
group_preferences = kwargs['group_preferences']
|
|
for admin in group.admins.all():
|
|
if admin.as_bot is None:
|
|
if models.GroupAccessControlGrant.objects.filter(group_preferences=group_preferences, allows=admin).first() is None:
|
|
admin_choices.append(
|
|
(admin.id, f'{admin.name} | @{admin.telegram_username or "--unset--"} ({admin.telegram_id})'))
|
|
return admin_choices
|
|
|
|
def _restrain_form(self, form, **kwargs):
|
|
form.fields['allows'].choices = kwargs['admin_choices']
|
|
return super()._restrain_form(form, **kwargs)
|
|
|
|
|
|
class ManageAccessControlRevokeView(ManageAccessControlGrantView):
|
|
template_name = 'manage_group_formdelete.haml'
|
|
|
|
def generate_admin_choices(self, **kwargs):
|
|
admin_choices = []
|
|
acg = kwargs['access_control_grant']
|
|
if not acg._state.adding:
|
|
admin = acg.allows
|
|
admin_choices.append(
|
|
(admin.id, f'{admin.name} | @{admin.telegram_username or "--unset--"} ({admin.telegram_id})'))
|
|
else:
|
|
raise Http404
|
|
return admin_choices
|
|
|
|
def _putForms(self, **kwargs):
|
|
new_kwargs = super()._putForms(**kwargs)
|
|
new_kwargs['page_name'] = 'Access Control Revoke'
|
|
return new_kwargs
|
|
|
|
def _restrain_form(self, form, **kwargs):
|
|
new_form = super()._restrain_form(form, **kwargs)
|
|
for field in form.fields.items():
|
|
field[1].widget.attrs['readonly'] = True
|
|
field[1].widget.attrs['disabled'] = True
|
|
return new_form
|