furmeet-bot/webproj/bot/views.py

777 lines
39 KiB
Python
Raw Normal View History

2020-07-08 05:32:26 +00:00
import datetime
import gzip
import json
import mimetypes
2020-07-09 20:39:12 +00:00
import random
2020-07-08 05:32:26 +00:00
import sys
2020-07-12 01:01:41 +00:00
import traceback
2020-07-11 08:28:57 +00:00
import urllib.parse
2020-07-09 20:39:12 +00:00
from io import BytesIO
2020-07-08 05:32:26 +00:00
from pathlib import Path
from typing import Any, Dict, List
from uuid import UUID
import pytz
2020-07-11 03:34:48 +00:00
import requests
2020-07-08 05:32:26 +00:00
2020-07-11 03:34:48 +00:00
import django.utils.timezone
2020-07-09 20:39:12 +00:00
from captcha.image import ImageCaptcha
2020-07-08 05:32:26 +00:00
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.auth.mixins import LoginRequiredMixin
2020-07-11 08:28:57 +00:00
from django.core.exceptions import ValidationError
2020-07-08 05:32:26 +00:00
from django.http import FileResponse, HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404, redirect, render, resolve_url
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.html import escape
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import last_modified
2020-08-28 05:14:33 +00:00
from django.views.generic.base import TemplateResponseMixin, ContextMixin
2020-07-09 20:39:12 +00:00
from telegram.bot import Bot
2020-07-12 02:03:59 +00:00
from telegram.error import BadRequest as TelegramBadRequest
2020-07-11 23:43:27 +00:00
from telegram.error import Unauthorized as TelegramUnauthorized
2020-07-12 02:03:59 +00:00
from telegram.parsemode import ParseMode
2020-07-09 20:39:12 +00:00
2020-07-10 04:57:06 +00:00
from ..mwt import MWT
2020-07-11 03:34:48 +00:00
from . import forms, models
2020-07-09 20:39:12 +00:00
imageCaptchaFonts = [
str(Path('webproj/fonts/Roboto/Roboto-Regular.ttf').absolute()),
str(Path('webproj/fonts/Fondamento/Fondamento-Regular.ttf').absolute()),
str(Path('webproj/fonts/Josefin_Sans/static/JosefinSans-Medium.ttf').absolute()),
str(Path('webproj/fonts/Lora/static/Lora-Medium.ttf').absolute()),
str(Path('webproj/fonts/Quicksand/static/Quicksand-Regular.ttf').absolute()),
str(Path('webproj/fonts/Work_Sans/static/WorkSans-Medium.ttf').absolute()),
]
2020-08-28 05:14:33 +00:00
imageCaptcha = ImageCaptcha(300, 150, fonts=imageCaptchaFonts, font_sizes=[
36, 40, 44, 48, 54, 60, 66, 72])
2020-07-09 20:39:12 +00:00
for truefont in imageCaptcha.truefonts:
pass
2020-07-08 01:03:31 +00:00
# Create your views here.
2020-07-08 05:32:26 +00:00
2020-08-28 05:14:33 +00:00
class TemplateView(TemplateResponseMixin, ContextMixin, View):
def get(self, request, *args, **kwargs):
context = self.get_context_data(**kwargs)
return self.render_to_response(context)
2020-07-10 04:57:06 +00:00
class RequireTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
2020-08-28 05:14:33 +00:00
telegram_user = models.TelegramUser.objects.filter(
uuid=telegram_user_hash).first()
2020-07-10 04:57:06 +00:00
if telegram_user_hash is None or telegram_user is None:
return redirect('home')
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
class UseTelegramLogin:
def __init__(self, func):
self.func = func
def __call__(self, request, *args, **kwargs):
telegram_user_hash = request.COOKIES.get('user_hash', None)
2020-08-28 05:14:33 +00:00
telegram_user = models.TelegramUser.objects.filter(
uuid=telegram_user_hash).first()
2020-07-10 04:57:06 +00:00
return self.func(request=request, tg_user=telegram_user, *args, **kwargs)
@method_decorator(UseTelegramLogin, 'dispatch')
2020-07-08 05:32:26 +00:00
class SoonView(TemplateView):
2020-07-09 05:32:49 +00:00
template_name = "soon.haml"
2020-07-10 04:57:06 +00:00
@method_decorator(UseTelegramLogin, 'dispatch')
2020-07-09 05:32:49 +00:00
class HomeView(TemplateView):
template_name = "index.haml"
2020-07-10 04:57:06 +00:00
@method_decorator(UseTelegramLogin, 'dispatch')
2020-07-09 05:32:49 +00:00
class TermsView(TemplateView):
template_name = "terms.haml"
2020-07-10 04:57:06 +00:00
@method_decorator(UseTelegramLogin, 'dispatch')
2020-07-09 05:32:49 +00:00
class PrivacyView(TemplateView):
template_name = "privacy.haml"
2020-07-10 04:57:06 +00:00
@method_decorator(UseTelegramLogin, 'dispatch')
2020-07-09 07:25:20 +00:00
class PythonLicensesView(TemplateView):
template_name = "python_licenses.haml"
2020-07-10 04:57:06 +00:00
def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser:
2020-08-28 05:14:33 +00:00
telegramUser = models.TelegramUser.objects.filter(
telegram_id=obj['id']).first()
2020-07-09 20:39:12 +00:00
if telegramUser is None:
telegramUser = models.TelegramUser()
2020-07-10 04:57:06 +00:00
telegramUser.telegram_id = obj['id']
2020-07-09 20:39:12 +00:00
telegramUser.telegram_first_name = obj['first_name']
telegramUser.telegram_last_name = obj.get('last_name', None)
telegramUser.telegram_username = obj.get('username', None)
telegramUser.save()
return telegramUser
2020-07-10 04:57:06 +00:00
def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup:
2020-08-28 05:14:33 +00:00
telegramGroup = models.TelegramGroup.objects.filter(
telegram_id=obj['id']).first()
2020-07-09 20:39:12 +00:00
if telegramGroup is None:
telegramGroup = models.TelegramGroup()
2020-07-10 04:57:06 +00:00
telegramGroup.telegram_id = obj['id']
2020-07-09 20:39:12 +00:00
telegramGroup.telegram_title = obj['title']
telegramGroup.save()
return telegramGroup
2020-07-10 04:57:06 +00:00
LAST_BOT: Bot = None
def get_bot_self_uncached(token: str) -> int:
tg_me = Bot(token).get_me()
getTelegramUser(tg_me.__dict__)
tg_bt = models.TelegramUserBots.objects.filter(token=token).first()
if tg_bt is not None:
tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip()
tg_bt.telegram_username = tg_me.username
tg_bt.telegram_id = tg_me.id
tg_bt.save()
return tg_me.id
get_bot_self = MWT(timeout=60)(get_bot_self_uncached)
@MWT(timeout=60)
def get_admins(chat_id: int) -> List[int]:
return [getTelegramUser(admin.user.__dict__).telegram_id
for admin in LAST_BOT.get_chat_administrators(chat_id)]
def setupWebHook(token):
bot_webhook_url = f'https://{settings.FIRST_DOMAIN}/telegram/{token}/'
Bot(token).set_webhook(url=bot_webhook_url)
class WebHookSetupView(View):
def get(self, request):
setupWebHook(settings.TELEGRAM_TOKEN)
return HttpResponse()
2020-07-09 05:32:49 +00:00
@method_decorator(csrf_exempt, 'dispatch')
class WebHookView(View):
def post(self, request, telegram):
2020-07-10 04:57:06 +00:00
global LAST_BOT
2020-07-09 05:32:49 +00:00
update = json.loads(request.body)
2020-08-28 05:14:33 +00:00
chatIsFromBot: bool = update.get('message', {}).get(
'from', {}).get('is_bot', True)
2020-07-11 03:34:48 +00:00
chatMessageId: id = update.get('message', {}).get('message_id', 0)
2020-07-09 07:25:20 +00:00
chatMessage: str = update.get('message', {}).get('text', '')
2020-08-28 05:14:33 +00:00
chatType: str = update.get('message', {}).get(
'chat', {}).get('type', '')
2020-07-09 07:25:20 +00:00
chatId: int = update.get('message', {}).get('chat', {}).get('id', 0)
fromId: int = update.get('message', {}).get('from', {}).get('id', 0)
2020-08-28 05:14:33 +00:00
newChatMembers: List[Dict[str, Any]] = update.get(
'message', {}).get('new_chat_members', [])
2020-07-11 23:43:27 +00:00
try:
if not chatIsFromBot:
2020-08-28 05:14:33 +00:00
registeredBot = models.TelegramUserBots.objects.filter(
token=telegram).first()
2020-07-11 23:43:27 +00:00
if registeredBot is None and telegram != settings.TELEGRAM_TOKEN:
Bot(telegram).set_webhook(url='')
elif chatType == 'channel':
Bot(telegram).leave_chat(chat_id=chatId)
else:
myId = get_bot_self(telegram)
botUser = models.TelegramUser.objects.get(telegram_id=myId)
bot = Bot(telegram)
2020-08-28 05:14:33 +00:00
telegramUser = getTelegramUser(
update.get('message', {}).get('from', {}))
2020-07-11 23:43:27 +00:00
if chatType == 'private':
if registeredBot is None:
if chatMessage == '/login':
2020-08-28 05:14:33 +00:00
loginHash = models.LoginHash(
telegram_user=telegramUser)
2020-07-11 23:43:27 +00:00
loginHash.save()
bot.send_message(
chat_id=chatId,
text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}',
disable_web_page_preview=True
)
elif chatMessage in ('/start', '/help'):
bot.send_message(
chat_id=chatId,
text='I am a bot that is able to provide the following services to your group:\n' +
' - Anti-Spam through CAS (https://cas.chat)\n' +
' - Anti-Spam through CAPTCHA\n' +
' - Join/Leave messages\n' +
' - Canned responses\n' +
'\n' +
f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' +
' - Here, you can only /login or ask me for /help.\n'
'\n' +
'You can run your custom-branded bot for free*.\n' +
' * might change in the future' +
''
)
elif chatMessage == '/previewCAPTCHA':
secret = ''
while len(secret) < 7:
secret += str(random.randint(0, 9))
imbytes = imageCaptcha.generate(secret)
bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=f'Sample CAPTCHA for secret "{secret}"'
)
else:
bot.send_message(
chat_id=chatId,
text=f'Sorry, I could not understand you.\nTry /help.'
)
2020-07-09 20:39:12 +00:00
else:
bot.send_message(
chat_id=chatId,
2020-07-11 23:43:27 +00:00
text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.',
2020-07-09 20:39:12 +00:00
)
2020-07-11 23:43:27 +00:00
elif chatType in ('group', 'supergroup'):
LAST_BOT = bot
chatAdminsTelegramIds = get_admins(chatId)
LAST_BOT = None
2020-08-28 05:14:33 +00:00
telegramGroup = getTelegramGroup(
update.get('message', {}).get('chat', {}))
telegramGroup.admins.set(models.TelegramUser.objects.filter(
telegram_id__in=chatAdminsTelegramIds).all())
2020-07-11 23:43:27 +00:00
telegramGroup.save()
2020-08-28 05:14:33 +00:00
leftMemberUpdate = update.get(
'message', {}).get('left_chat_member', None)
telegramGroupPreferences = models.GroupPreferences.objects.filter(
group=telegramGroup).first()
2020-07-11 23:43:27 +00:00
if myId in chatAdminsTelegramIds:
if telegramGroupPreferences is not None:
def send_hello(member):
join_message = (telegramGroupPreferences.join_message
.replace('{NAME}', member.name)
.replace('{GROUP}', telegramGroup.telegram_title)
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
2020-07-12 02:03:59 +00:00
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
2020-07-11 23:43:27 +00:00
.strip())[:4000]
if len(join_message) > 0:
bot.send_message(
2020-07-11 03:34:48 +00:00
chat_id=chatId,
2020-07-11 23:43:27 +00:00
text=join_message,
2020-07-12 02:03:59 +00:00
parse_mode=ParseMode.HTML,
2020-07-11 03:34:48 +00:00
)
2020-07-11 23:43:27 +00:00
if len(newChatMembers) > 0:
for newChatMember in newChatMembers:
2020-08-28 05:14:33 +00:00
telegramMember = getTelegramUser(
newChatMember)
2020-07-11 23:43:27 +00:00
if telegramGroupPreferences.combot_anti_spam_use:
2020-08-28 05:14:33 +00:00
response = requests.get(
f'https://api.cas.chat/check?user_id={telegramMember.telegram_id}')
2020-07-11 23:43:27 +00:00
jsonResponse = None if response is None else response.json()
2020-08-28 05:14:33 +00:00
banned = None if jsonResponse is None else jsonResponse.get(
'ok', False)
2020-07-11 23:43:27 +00:00
if banned:
bot.kick_chat_member(
chat_id=chatId,
user_id=telegramMember.telegram_id,
)
2020-07-12 00:41:08 +00:00
if chatType == 'supergroup':
bot.unban_chat_member(
chat_id=chatId,
user_id=telegramMember.telegram_id,
)
2020-07-11 23:43:27 +00:00
banned_message = (telegramGroupPreferences.combot_anti_spam_notification
.replace('{NAME}', telegramMember.name)
.replace('{GROUP}', telegramGroup.telegram_title)
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
2020-07-12 02:03:59 +00:00
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
2020-07-11 23:43:27 +00:00
.strip())[:4000]
if len(banned_message):
bot.send_message(
chat_id=chatId,
text=banned_message,
2020-07-12 02:03:59 +00:00
parse_mode=ParseMode.HTML,
2020-07-11 23:43:27 +00:00
)
continue
if telegramGroupPreferences.captcha_use:
2020-07-11 03:34:48 +00:00
secret = ''
while len(secret) < telegramGroupPreferences.captcha_digits:
2020-08-28 05:14:33 +00:00
secret += random.choice(
telegramGroupPreferences.captcha_chars)
models.PendingCaptchaUser.objects.filter(
group=telegramGroup, user=telegramMember).delete()
2020-07-11 23:43:27 +00:00
pcu = models.PendingCaptchaUser(
bot_token=telegram,
user=telegramMember,
group=telegramGroup,
captcha_answer=secret,
attempts_left=telegramGroupPreferences.captcha_attempts,
lifetime=telegramGroupPreferences.captcha_timeout,
captcha_message_id=0,
)
pcu.save()
2020-08-28 05:14:33 +00:00
imbytes = imageCaptcha.generate(
secret)
2020-07-11 23:43:27 +00:00
captcha_message = (telegramGroupPreferences.captcha_first_message
.replace('{NAME}', telegramMember.name)
2020-07-11 03:34:48 +00:00
.replace('{GROUP}', telegramGroup.telegram_title)
2020-07-11 23:43:27 +00:00
.replace('{TIMEOUT}', str(telegramGroupPreferences.captcha_timeout))
.replace('{ATTEMPTS}', str(telegramGroupPreferences.captcha_attempts))
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
2020-07-11 03:34:48 +00:00
.strip())[:1000]
if len(captcha_message) <= 0:
2020-07-11 23:43:27 +00:00
captcha_message = (f'Hello, {telegramMember.name}!\n' +
f'To prevent automated spam on {telegramGroup.telegram_title}, we require every new user to solve at CAPTCHA\n' +
f'You have {telegramGroupPreferences.captcha_timeout} seconds or {telegramGroupPreferences.captcha_attempts} failed attempts before getting kicked.' +
2020-07-11 03:34:48 +00:00
'')
sent_message = bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=captcha_message,
2020-07-12 02:49:05 +00:00
parse_mode=ParseMode.HTML,
2020-07-11 03:34:48 +00:00
)
2020-07-11 23:43:27 +00:00
pcu.captcha_message_id = sent_message.message_id
pcu.save()
2020-07-11 03:34:48 +00:00
else:
2020-07-11 23:43:27 +00:00
send_hello(telegramMember)
2020-07-11 03:34:48 +00:00
else:
2020-08-28 05:14:33 +00:00
pendingCaptcha = models.PendingCaptchaUser.objects.filter(
group=telegramGroup, user=telegramUser).first()
2020-07-11 23:43:27 +00:00
if pendingCaptcha is not None:
if not telegramGroupPreferences.captcha_leave_mess:
try:
bot.delete_message(
chat_id=chatId,
message_id=pendingCaptcha.captcha_message_id,
)
except:
pass
try:
bot.delete_message(
chat_id=chatId,
message_id=chatMessageId,
)
except:
pass
if leftMemberUpdate is not None:
pass
elif chatMessage == pendingCaptcha.captcha_answer:
2020-07-11 23:43:27 +00:00
pendingCaptcha.delete()
send_hello(telegramUser)
else:
pendingCaptcha.attempts_left -= 1
2020-08-28 05:14:33 +00:00
td: datetime.timedelta = (pendingCaptcha.created + datetime.timedelta(
seconds=pendingCaptcha.lifetime)) - django.utils.timezone.now()
sec_remain = int(
td.total_seconds())
if pendingCaptcha.attempts_left > 0 and sec_remain > 0:
2020-07-11 23:43:27 +00:00
secret = ''
while len(secret) < telegramGroupPreferences.captcha_digits:
2020-08-28 05:14:33 +00:00
secret += random.choice(
telegramGroupPreferences.captcha_chars)
imbytes = imageCaptcha.generate(
secret)
2020-07-11 23:43:27 +00:00
captcha_message = (telegramGroupPreferences.captcha_retry_message
.replace('{NAME}', telegramUser.name)
.replace('{GROUP}', telegramGroup.telegram_title)
.replace('{TIMEOUT}', str(sec_remain))
.replace('{ATTEMPTS}', str(pendingCaptcha.attempts_left))
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
2020-07-11 23:43:27 +00:00
.strip())[:1000]
if len(captcha_message) <= 0:
captcha_message = (f'Oops... that was not the answer, {telegramUser.name}.\n' +
f'Let\'s try solving the CAPTCHA again!\n' +
f'You have {sec_remain} seconds or {pendingCaptcha.attempts_left} failed attempts before getting kicked.' +
'')
sent_message = bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=captcha_message,
2020-07-12 02:49:05 +00:00
parse_mode=ParseMode.HTML,
2020-07-11 23:43:27 +00:00
)
pendingCaptcha.captcha_answer = secret
pendingCaptcha.captcha_message_id = sent_message.message_id
pendingCaptcha.save()
else:
2020-07-12 00:41:08 +00:00
try:
bot.kick_chat_member(
chat_id=chatId,
user_id=telegramUser.telegram_id,
)
if chatType == 'supergroup':
bot.unban_chat_member(
chat_id=chatId,
user_id=telegramUser.telegram_id,
)
except:
traceback.print_exc()
2020-07-11 23:43:27 +00:00
pendingCaptcha.delete()
else:
for canned_message in telegramGroupPreferences.canned_messages.all():
if canned_message.listen in chatMessage:
2020-07-12 00:41:08 +00:00
try:
bot.send_message(
chat_id=chatId,
reply_to_message_id=chatMessageId,
2020-07-12 02:03:59 +00:00
text=canned_message.reply_with
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
2020-07-12 02:03:59 +00:00
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', ''),
parse_mode=ParseMode.HTML,
2020-07-12 00:41:08 +00:00
)
except TelegramBadRequest:
traceback.print_exc()
2020-07-11 23:43:27 +00:00
if leftMemberUpdate is not None:
if fromId != myId:
2020-08-28 05:14:33 +00:00
telegramMember = getTelegramUser(
leftMemberUpdate)
2020-07-11 23:43:27 +00:00
leave_message = (telegramGroupPreferences.leave_message
.replace('{NAME}', telegramMember.name)
.replace('{GROUP}', telegramGroup.telegram_title)
2020-07-12 02:49:05 +00:00
.replace('&nbsp;', ' ')
2020-07-12 02:03:59 +00:00
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
2020-07-11 23:43:27 +00:00
.strip())[:4000]
if len(leave_message) > 0:
bot.send_message(
chat_id=chatId,
text=leave_message,
2020-07-12 02:03:59 +00:00
parse_mode=ParseMode.HTML,
2020-07-11 23:43:27 +00:00
)
except TelegramUnauthorized:
pass
2020-07-09 07:25:20 +00:00
print(json.dumps(update, indent=4))
2020-07-09 05:32:49 +00:00
return HttpResponse()
2020-07-09 20:39:12 +00:00
class LoginView(View):
def get(self, request, tuid, hsh):
lh = models.LoginHash.objects.filter(id=tuid).first()
if lh is None or str(lh.uuid) != hsh:
hr = HttpResponse('Access Denied')
hr.status_code = 401
return hr
response = redirect('control_panel')
2020-08-28 05:14:33 +00:00
response.set_cookie(
'user_hash', f'{lh.telegram_user.uuid}', expires=4*7*24*3600)
2020-07-09 20:39:12 +00:00
lh.delete()
return response
class LogoutView(View):
def get(self, request):
response = redirect('control_panel')
response.delete_cookie('user_hash')
return response
2020-07-10 04:57:06 +00:00
@method_decorator(RequireTelegramLogin, 'dispatch')
class ControlPanelView(TemplateView):
template_name = 'control_panel.haml'
2020-07-09 20:39:12 +00:00
2020-07-10 04:57:06 +00:00
@method_decorator(RequireTelegramLogin, 'dispatch')
class AddTelegramBotView(View):
def post(self, request, tg_user):
token = request.POST.get('token', '')
bot_self = get_bot_self(token=token)
userBot = models.TelegramUserBots.objects.filter(token=token).first()
if userBot is None:
userBot = models.TelegramUserBots()
userBot.owner = tg_user
userBot.telegram_id = bot_self
userBot.token = token
userBot.save()
get_bot_self_uncached(token=token)
setupWebHook(token)
return redirect('control_panel')
2020-07-09 20:39:12 +00:00
@method_decorator(RequireTelegramLogin, 'dispatch')
2020-07-10 04:57:06 +00:00
class DeleteAccountView(View):
def post(self, request, tg_user):
confirmation = request.POST.get('confirmation', '')
if confirmation == 'Yes, delete my account and all its data':
tg_user.delete()
return redirect('logout')
else:
return HttpResponse('Strings do not match:<br>\n' +
f'{confirmation}<br>\n' +
'Yes, delete my account and all its data<br>\n' +
'<br>\n' +
'Nothing was done.')
@method_decorator(RequireTelegramLogin, 'dispatch')
class DeleteBot(TemplateView):
template_name = 'delete_item.haml'
def get_context_data(self, **kwargs):
kwargs['display_template'] = 'delete_item_bot.haml'
kwargs['previous'] = resolve_url('control_panel')
kwargs['item'] = self._findFromContext(**kwargs)
return super().get_context_data(**kwargs)
def _findFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].bots, id=kwargs['bot_id'])
def post(self, request, **kwargs):
self._findFromContext(**kwargs).delete()
return redirect('control_panel')
2020-08-28 05:14:33 +00:00
@ method_decorator(RequireTelegramLogin, 'dispatch')
2020-07-10 04:57:06 +00:00
class ManageGroupGenericView(TemplateView):
2020-07-11 08:28:57 +00:00
def get(self, request, **kwargs):
2020-08-28 05:14:33 +00:00
return super().get(request, **{'request_': request, **kwargs})
2020-07-11 08:28:57 +00:00
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
2020-08-28 05:14:33 +00:00
kwargs['group_preferences'] = self._findGroupPreferencesFromContext(
**kwargs)
2020-07-10 04:57:06 +00:00
kwargs['group'] = self._findGroupFromContext(**kwargs)
kwargs = self._putForms(**kwargs)
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
def _findGroupFromContext(self, **kwargs):
return get_object_or_404(kwargs['tg_user'].admins, id=kwargs['group_id'])
def _findGroupPreferencesFromContext(self, **kwargs):
group = self._findGroupFromContext(**kwargs)
if getattr(group, 'preferences', None) is None:
groupPreferences = models.GroupPreferences()
groupPreferences.group = group
groupPreferences.save()
group = self._findGroupFromContext(**kwargs)
return group.preferences
def _putForms(self, **kwargs):
2020-07-11 08:28:57 +00:00
request = kwargs['request_']
if request.GET.get('form_validation_failed', 'n') == 'y':
form = kwargs['form']
form = form.__class__(request.GET, instance=form.instance)
kwargs['form'] = form
try:
form.full_clean()
except ValidationError as e:
raise
# raise Exception
return kwargs
2020-07-10 04:57:06 +00:00
def post(self, request, **kwargs):
2020-07-11 08:28:57 +00:00
kwargs = self.get_context_data(**kwargs, request_=request)
if (form := kwargs.get('form', None)) is not None:
if request.POST.get('do', '') != 'delete':
form = form.__class__(request.POST, instance=form.instance)
2020-07-11 08:28:57 +00:00
try:
form.full_clean()
form.save()
except (ValueError, ValidationError) as e:
postcopy = request.POST.copy()
postcopy['csrfmiddlewaretoken'] = ''
postcopy['do'] = ''
del postcopy['csrfmiddlewaretoken']
del postcopy['do']
return HttpResponseRedirect(f'{request.path}?form_validation_failed=y&{urllib.parse.urlencode(postcopy)}')
else:
form.instance.delete()
return HttpResponseRedirect(kwargs.get('redirect_on_submit', None) or request.path)
2020-07-10 04:57:06 +00:00
class ManageGroupView(ManageGroupGenericView):
template_name = 'manage_group.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('control_panel')
return super().get_context_data(**kwargs)
class ManageGroupAntiSpamView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Anti-Spam'
kwargs['form'] = forms.AntiSpam(instance=kwargs['group_preferences'])
2020-07-11 08:28:57 +00:00
return super()._putForms(**kwargs)
2020-07-10 04:57:06 +00:00
2020-07-10 04:57:06 +00:00
class ManageGroupCAPTCHAView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'CAPTCHA'
kwargs['form'] = forms.CAPTCHA(instance=kwargs['group_preferences'])
2020-07-11 08:28:57 +00:00
return super()._putForms(**kwargs)
2020-07-10 04:57:06 +00:00
class ManageGroupJoinLeaveGreetingsView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Join/Leave Greetings'
kwargs['form'] = forms.Greetings(instance=kwargs['group_preferences'])
2020-07-11 08:28:57 +00:00
return super()._putForms(**kwargs)
2020-07-10 04:57:06 +00:00
2020-07-10 04:57:06 +00:00
class ManageGroupCannedMessageView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
2020-08-28 05:14:33 +00:00
kwargs['previous'] = resolve_url(
'manage_group_cannedmessages', kwargs['group_id'])
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['canned_message'] = (
2020-08-28 05:14:33 +00:00
kwargs['group_preferences'].canned_messages.filter(
id=kwargs['cannedmessage_id']).first()
or
2020-08-28 05:14:33 +00:00
models.GroupCannedMessage(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.CannedMessage(instance=kwargs['canned_message'])
kwargs['deletion_allowed'] = not kwargs['canned_message']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
2020-07-11 08:28:57 +00:00
return super()._putForms(**kwargs)
2020-07-10 04:57:06 +00:00
class ManageGroupCannedMessagesView(ManageGroupGenericView):
template_name = 'manage_group_cannedmessages.haml'
2020-07-10 04:57:06 +00:00
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
2020-07-10 04:57:06 +00:00
return super().get_context_data(**kwargs)
2020-08-28 05:14:33 +00:00
class ManageGroupPlannedMessagesView(ManageGroupGenericView):
template_name = 'manage_group_plannedmessages.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group', kwargs['group_id'])
return super().get_context_data(**kwargs)
class ManageGroupPlannedMessagesTimezoneView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Planned Messages\' Timezone'
kwargs['form'] = forms.PlannedMessageTimezone(instance=kwargs['group_preferences'])
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedMessageView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['planned_message'] = (
kwargs['group_preferences'].planned_messages.filter(
id=kwargs['plannedmessage_id']).first()
or
models.GroupPlannedMessage(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.PlannedMessage(instance=kwargs['planned_message'])
kwargs['deletion_allowed'] = not kwargs['planned_message']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedDispatchView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Canned Message'
kwargs['planned_dispatch'] = (
kwargs['group_preferences'].planned_message_dispatches.filter(
id=kwargs['planneddispatch_id']).first()
or
models.GroupPlannedDispatch(
group_preferences=kwargs['group_preferences'])
)
kwargs['form'] = forms.PlannedDispatch(instance=kwargs['planned_dispatch'])
kwargs['deletion_allowed'] = not kwargs['planned_dispatch']._state.adding
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)
class ManageGroupPlannedEnablednessView(ManageGroupGenericView):
template_name = 'manage_group_form.haml'
def get_context_data(self, **kwargs):
kwargs['previous'] = resolve_url('manage_group_plannedmessages', kwargs['group_id'])
return super().get_context_data(**kwargs)
def _putForms(self, **kwargs):
kwargs['page_name'] = 'Planned Messages Configuration'
kwargs['form'] = forms.PlannedMessageConfiguration(instance=kwargs['group_preferences'])
bot_choices = [
(None, f'@{settings.BOT_NAME} (the shared bot)'),
]
bots_considered = set([None])
for bot in ([kwargs['group_preferences'].planned_message_issuer] + list(kwargs['tg_user'].bots.all())):
if bot not in bots_considered:
bot_choices.append((bot.id, f'@{bot.telegram_username} ({bot.telegram_id}) by {bot.owner.name}'))
bots_considered.add(bot)
kwargs['form'].fields['planned_message_issuer'].choices = bot_choices
kwargs['redirect_on_submit'] = kwargs['previous']
return super()._putForms(**kwargs)