import datetime import gzip import json import mimetypes import random import sys import urllib.parse from io import BytesIO from pathlib import Path from typing import Any, Dict, List from uuid import UUID import pytz import requests import django.utils.timezone from captcha.image import ImageCaptcha from django.conf import settings from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin from django.core.exceptions import ValidationError from django.http import FileResponse, HttpResponse, HttpResponseRedirect from django.shortcuts import get_object_or_404, redirect, render, resolve_url from django.urls import reverse from django.utils.decorators import method_decorator from django.utils.html import escape from django.views import View from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import last_modified from django.views.generic import TemplateView from telegram.bot import Bot from telegram.error import Unauthorized as TelegramUnauthorized from ..mwt import MWT from . import forms, models imageCaptchaFonts = [ str(Path('webproj/fonts/Roboto/Roboto-Regular.ttf').absolute()), str(Path('webproj/fonts/Fondamento/Fondamento-Regular.ttf').absolute()), str(Path('webproj/fonts/Josefin_Sans/static/JosefinSans-Medium.ttf').absolute()), str(Path('webproj/fonts/Lora/static/Lora-Medium.ttf').absolute()), str(Path('webproj/fonts/Quicksand/static/Quicksand-Regular.ttf').absolute()), str(Path('webproj/fonts/Work_Sans/static/WorkSans-Medium.ttf').absolute()), ] imageCaptcha = ImageCaptcha(300, 150, fonts=imageCaptchaFonts, font_sizes=[36, 40, 44, 48, 54, 60, 66, 72]) for truefont in imageCaptcha.truefonts: pass # Create your views here. class RequireTelegramLogin: def __init__(self, func): self.func = func def __call__(self, request, *args, **kwargs): telegram_user_hash = request.COOKIES.get('user_hash', None) telegram_user = models.TelegramUser.objects.filter(uuid=telegram_user_hash).first() if telegram_user_hash is None or telegram_user is None: return redirect('home') return self.func(request=request, tg_user=telegram_user, *args, **kwargs) class UseTelegramLogin: def __init__(self, func): self.func = func def __call__(self, request, *args, **kwargs): telegram_user_hash = request.COOKIES.get('user_hash', None) telegram_user = models.TelegramUser.objects.filter(uuid=telegram_user_hash).first() return self.func(request=request, tg_user=telegram_user, *args, **kwargs) @method_decorator(UseTelegramLogin, 'dispatch') class SoonView(TemplateView): template_name = "soon.haml" @method_decorator(UseTelegramLogin, 'dispatch') class HomeView(TemplateView): template_name = "index.haml" @method_decorator(UseTelegramLogin, 'dispatch') class TermsView(TemplateView): template_name = "terms.haml" @method_decorator(UseTelegramLogin, 'dispatch') class PrivacyView(TemplateView): template_name = "privacy.haml" @method_decorator(UseTelegramLogin, 'dispatch') class PythonLicensesView(TemplateView): template_name = "python_licenses.haml" def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser: telegramUser = models.TelegramUser.objects.filter(telegram_id=obj['id']).first() if telegramUser is None: telegramUser = models.TelegramUser() telegramUser.telegram_id = obj['id'] telegramUser.telegram_first_name = obj['first_name'] telegramUser.telegram_last_name = obj.get('last_name', None) telegramUser.telegram_username = obj.get('username', None) telegramUser.save() return telegramUser def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup: telegramGroup = models.TelegramGroup.objects.filter(telegram_id=obj['id']).first() if telegramGroup is None: telegramGroup = models.TelegramGroup() telegramGroup.telegram_id = obj['id'] telegramGroup.telegram_title = obj['title'] telegramGroup.save() return telegramGroup LAST_BOT: Bot = None def get_bot_self_uncached(token: str) -> int: tg_me = Bot(token).get_me() getTelegramUser(tg_me.__dict__) tg_bt = models.TelegramUserBots.objects.filter(token=token).first() if tg_bt is not None: tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip() tg_bt.telegram_username = tg_me.username tg_bt.telegram_id = tg_me.id tg_bt.save() return tg_me.id get_bot_self = MWT(timeout=60)(get_bot_self_uncached) @MWT(timeout=60) def get_admins(chat_id: int) -> List[int]: return [getTelegramUser(admin.user.__dict__).telegram_id for admin in LAST_BOT.get_chat_administrators(chat_id)] def setupWebHook(token): bot_webhook_url = f'https://{settings.FIRST_DOMAIN}/telegram/{token}/' Bot(token).set_webhook(url=bot_webhook_url) class WebHookSetupView(View): def get(self, request): setupWebHook(settings.TELEGRAM_TOKEN) return HttpResponse() @method_decorator(csrf_exempt, 'dispatch') class WebHookView(View): def post(self, request, telegram): global LAST_BOT update = json.loads(request.body) chatIsFromBot: bool = update.get('message', {}).get('from', {}).get('is_bot', True) chatMessageId: id = update.get('message', {}).get('message_id', 0) chatMessage: str = update.get('message', {}).get('text', '') chatType: str = update.get('message', {}).get('chat', {}).get('type', '') chatId: int = update.get('message', {}).get('chat', {}).get('id', 0) fromId: int = update.get('message', {}).get('from', {}).get('id', 0) newChatMembers: List[Dict[str, Any]] = update.get('message', {}).get('new_chat_members', []) try: if not chatIsFromBot: registeredBot = models.TelegramUserBots.objects.filter(token=telegram).first() if registeredBot is None and telegram != settings.TELEGRAM_TOKEN: Bot(telegram).set_webhook(url='') elif chatType == 'channel': Bot(telegram).leave_chat(chat_id=chatId) else: myId = get_bot_self(telegram) botUser = models.TelegramUser.objects.get(telegram_id=myId) bot = Bot(telegram) telegramUser = getTelegramUser(update.get('message', {}).get('from', {})) if chatType == 'private': if registeredBot is None: if chatMessage == '/login': loginHash = models.LoginHash(telegram_user=telegramUser) loginHash.save() bot.send_message( chat_id=chatId, text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}', disable_web_page_preview=True ) elif chatMessage in ('/start', '/help'): bot.send_message( chat_id=chatId, text='I am a bot that is able to provide the following services to your group:\n' + ' - Anti-Spam through CAS (https://cas.chat)\n' + ' - Anti-Spam through CAPTCHA\n' + ' - Join/Leave messages\n' + ' - Canned responses\n' + '\n' + f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' + ' - Here, you can only /login or ask me for /help.\n' '\n' + 'You can run your custom-branded bot for free*.\n' + ' * might change in the future' + '' ) elif chatMessage == '/previewCAPTCHA': secret = '' while len(secret) < 7: secret += str(random.randint(0, 9)) imbytes = imageCaptcha.generate(secret) bot.send_photo( chat_id=chatId, photo=imbytes, caption=f'Sample CAPTCHA for secret "{secret}"' ) else: bot.send_message( chat_id=chatId, text=f'Sorry, I could not understand you.\nTry /help.' ) else: bot.send_message( chat_id=chatId, text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.', ) elif chatType in ('group', 'supergroup'): LAST_BOT = bot chatAdminsTelegramIds = get_admins(chatId) LAST_BOT = None telegramGroup = getTelegramGroup(update.get('message', {}).get('chat', {})) telegramGroup.admins.set(models.TelegramUser.objects.filter(telegram_id__in=chatAdminsTelegramIds).all()) telegramGroup.save() leftMemberUpdate = update.get('message', {}).get('left_chat_member', None) telegramGroupPreferences = models.GroupPreferences.objects.filter(group=telegramGroup).first() if myId in chatAdminsTelegramIds: if telegramGroupPreferences is not None: def send_hello(member): join_message = (telegramGroupPreferences.join_message .replace('{NAME}', member.name) .replace('{GROUP}', telegramGroup.telegram_title) .strip())[:4000] if len(join_message) > 0: bot.send_message( chat_id=chatId, text=join_message, ) if len(newChatMembers) > 0: for newChatMember in newChatMembers: telegramMember = getTelegramUser(newChatMember) if telegramGroupPreferences.combot_anti_spam_use: response = requests.get(f'https://api.cas.chat/check?user_id={telegramMember.telegram_id}') jsonResponse = None if response is None else response.json() banned = None if jsonResponse is None else jsonResponse.get('ok', False) if banned: bot.kick_chat_member( chat_id=chatId, user_id=telegramMember.telegram_id, ) if chatType == 'supergroup': bot.unban_chat_member( chat_id=chatId, user_id=telegramMember.telegram_id, ) banned_message = (telegramGroupPreferences.combot_anti_spam_notification .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .strip())[:4000] if len(banned_message): bot.send_message( chat_id=chatId, text=banned_message, ) continue if telegramGroupPreferences.captcha_use: secret = '' while len(secret) < telegramGroupPreferences.captcha_digits: secret += random.choice(telegramGroupPreferences.captcha_chars) models.PendingCaptchaUser.objects.filter(group=telegramGroup, user=telegramMember).delete() pcu = models.PendingCaptchaUser( bot_token=telegram, user=telegramMember, group=telegramGroup, captcha_answer=secret, attempts_left=telegramGroupPreferences.captcha_attempts, lifetime=telegramGroupPreferences.captcha_timeout, captcha_message_id=0, ) pcu.save() imbytes = imageCaptcha.generate(secret) captcha_message = (telegramGroupPreferences.captcha_first_message .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace('{TIMEOUT}', str(telegramGroupPreferences.captcha_timeout)) .replace('{ATTEMPTS}', str(telegramGroupPreferences.captcha_attempts)) .strip())[:1000] if len(captcha_message) <= 0: captcha_message = (f'Hello, {telegramMember.name}!\n' + f'To prevent automated spam on {telegramGroup.telegram_title}, we require every new user to solve at CAPTCHA\n' + f'You have {telegramGroupPreferences.captcha_timeout} seconds or {telegramGroupPreferences.captcha_attempts} failed attempts before getting kicked.' + '') sent_message = bot.send_photo( chat_id=chatId, photo=imbytes, caption=captcha_message, ) pcu.captcha_message_id = sent_message.message_id pcu.save() else: send_hello(telegramMember) else: pendingCaptcha = models.PendingCaptchaUser.objects.filter(group=telegramGroup, user=telegramUser).first() if pendingCaptcha is not None: if not telegramGroupPreferences.captcha_leave_mess: try: bot.delete_message( chat_id=chatId, message_id=pendingCaptcha.captcha_message_id, ) except: pass try: bot.delete_message( chat_id=chatId, message_id=chatMessageId, ) except: pass if chatMessage == pendingCaptcha.captcha_answer: pendingCaptcha.delete() send_hello(telegramUser) else: pendingCaptcha.attempts_left -= 1 td: datetime.timedelta = (pendingCaptcha.created + datetime.timedelta(seconds=pendingCaptcha.lifetime)) - django.utils.timezone.now() sec_remain = int(td.total_seconds()) if pendingCaptcha.attempts_left > 0 and sec_remain > 0 and leftMemberUpdate is not None: secret = '' while len(secret) < telegramGroupPreferences.captcha_digits: secret += random.choice(telegramGroupPreferences.captcha_chars) imbytes = imageCaptcha.generate(secret) captcha_message = (telegramGroupPreferences.captcha_retry_message .replace('{NAME}', telegramUser.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace('{TIMEOUT}', str(sec_remain)) .replace('{ATTEMPTS}', str(pendingCaptcha.attempts_left)) .strip())[:1000] if len(captcha_message) <= 0: captcha_message = (f'Oops... that was not the answer, {telegramUser.name}.\n' + f'Let\'s try solving the CAPTCHA again!\n' + f'You have {sec_remain} seconds or {pendingCaptcha.attempts_left} failed attempts before getting kicked.' + '') sent_message = bot.send_photo( chat_id=chatId, photo=imbytes, caption=captcha_message, ) pendingCaptcha.captcha_answer = secret pendingCaptcha.captcha_message_id = sent_message.message_id pendingCaptcha.save() else: try: bot.kick_chat_member( chat_id=chatId, user_id=telegramUser.telegram_id, ) if chatType == 'supergroup': bot.unban_chat_member( chat_id=chatId, user_id=telegramUser.telegram_id, ) except: traceback.print_exc() pendingCaptcha.delete() else: for canned_message in telegramGroupPreferences.canned_messages.all(): if canned_message.listen in chatMessage: try: bot.send_message( chat_id=chatId, reply_to_message_id=chatMessageId, text=canned_message.reply_with, ) except TelegramBadRequest: traceback.print_exc() if leftMemberUpdate is not None: if fromId != myId: telegramMember = getTelegramUser(leftMemberUpdate) leave_message = (telegramGroupPreferences.leave_message .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .strip())[:4000] if len(leave_message) > 0: bot.send_message( chat_id=chatId, text=leave_message, ) except TelegramUnauthorized: pass print(json.dumps(update, indent=4)) return HttpResponse() class LoginView(View): def get(self, request, tuid, hsh): lh = models.LoginHash.objects.filter(id=tuid).first() if lh is None or str(lh.uuid) != hsh: hr = HttpResponse('Access Denied') hr.status_code = 401 return hr response = redirect('control_panel') response.set_cookie('user_hash', f'{lh.telegram_user.uuid}', expires=4*7*24*3600) lh.delete() return response class LogoutView(View): def get(self, request): response = redirect('control_panel') response.delete_cookie('user_hash') return response @method_decorator(RequireTelegramLogin, 'dispatch') class ControlPanelView(TemplateView): template_name = 'control_panel.haml' @method_decorator(RequireTelegramLogin, 'dispatch') class AddTelegramBotView(View): def post(self, request, tg_user): token = request.POST.get('token', '') bot_self = get_bot_self(token=token) userBot = models.TelegramUserBots.objects.filter(token=token).first() if userBot is None: userBot = models.TelegramUserBots() userBot.owner = tg_user userBot.telegram_id = bot_self userBot.token = token userBot.save() get_bot_self_uncached(token=token) setupWebHook(token) return redirect('control_panel') @method_decorator(RequireTelegramLogin, 'dispatch') class DeleteAccountView(View): def post(self, request, tg_user): confirmation = request.POST.get('confirmation', '') if confirmation == 'Yes, delete my account and all its data': tg_user.delete() return redirect('logout') else: return HttpResponse('Strings do not match:
\n' + f'{confirmation}
\n' + 'Yes, delete my account and all its data
\n' + '
\n' + 'Nothing was done.') @method_decorator(RequireTelegramLogin, 'dispatch') class DeleteBot(TemplateView): template_name = 'delete_item.haml' def get_context_data(self, **kwargs): kwargs['display_template'] = 'delete_item_bot.haml' kwargs['previous'] = resolve_url('control_panel') kwargs['item'] = self._findFromContext(**kwargs) return super().get_context_data(**kwargs) def _findFromContext(self, **kwargs): return get_object_or_404(kwargs['tg_user'].bots, id=kwargs['bot_id']) def post(self, request, **kwargs): self._findFromContext(**kwargs).delete() return redirect('control_panel') class ManageGroupGenericView(TemplateView): def get(self, request, **kwargs): return super().get(request, request_=request, **kwargs) def get_context_data(self, **kwargs): kwargs['group_preferences'] = self._findGroupPreferencesFromContext(**kwargs) kwargs['group'] = self._findGroupFromContext(**kwargs) kwargs = self._putForms(**kwargs) return super().get_context_data(**kwargs) def _findGroupFromContext(self, **kwargs): return get_object_or_404(kwargs['tg_user'].admins, id=kwargs['group_id']) def _findGroupPreferencesFromContext(self, **kwargs): group = self._findGroupFromContext(**kwargs) if getattr(group, 'preferences', None) is None: groupPreferences = models.GroupPreferences() groupPreferences.group = group groupPreferences.save() group = self._findGroupFromContext(**kwargs) return group.preferences def _putForms(self, **kwargs): request = kwargs['request_'] if request.GET.get('form_validation_failed', 'n') == 'y': form = kwargs['form'] form = form.__class__(request.GET, instance=form.instance) kwargs['form'] = form try: form.full_clean() except ValidationError as e: raise # raise Exception return kwargs def post(self, request, **kwargs): kwargs = self.get_context_data(**kwargs, request_=request) if (form := kwargs.get('form', None)) is not None: if request.POST.get('do', '') != 'delete': form = form.__class__(request.POST, instance=form.instance) try: form.full_clean() form.save() except (ValueError, ValidationError) as e: postcopy = request.POST.copy() postcopy['csrfmiddlewaretoken'] = '' postcopy['do'] = '' del postcopy['csrfmiddlewaretoken'] del postcopy['do'] return HttpResponseRedirect(f'{request.path}?form_validation_failed=y&{urllib.parse.urlencode(postcopy)}') else: form.instance.delete() return HttpResponseRedirect(kwargs.get('redirect_on_submit', None) or request.path) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupView(ManageGroupGenericView): template_name = 'manage_group.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('control_panel') return super().get_context_data(**kwargs) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupAntiSpamView(ManageGroupGenericView): template_name = 'manage_group_form.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('manage_group', kwargs['group_id']) return super().get_context_data(**kwargs) def _putForms(self, **kwargs): kwargs['page_name'] = 'Anti-Spam' kwargs['form'] = forms.AntiSpam(instance=kwargs['group_preferences']) return super()._putForms(**kwargs) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupCAPTCHAView(ManageGroupGenericView): template_name = 'manage_group_form.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('manage_group', kwargs['group_id']) return super().get_context_data(**kwargs) def _putForms(self, **kwargs): kwargs['page_name'] = 'CAPTCHA' kwargs['form'] = forms.CAPTCHA(instance=kwargs['group_preferences']) return super()._putForms(**kwargs) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupJoinLeaveGreetingsView(ManageGroupGenericView): template_name = 'manage_group_form.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('manage_group', kwargs['group_id']) return super().get_context_data(**kwargs) def _putForms(self, **kwargs): kwargs['page_name'] = 'Join/Leave Greetings' kwargs['form'] = forms.Greetings(instance=kwargs['group_preferences']) return super()._putForms(**kwargs) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupCannedMessageView(ManageGroupGenericView): template_name = 'manage_group_form.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('manage_group_cannedmessages', kwargs['group_id']) return super().get_context_data(**kwargs) def _putForms(self, **kwargs): kwargs['page_name'] = 'Canned Message' kwargs['canned_message'] = ( kwargs['group_preferences'].canned_messages.filter(id=kwargs['cannedmessage_id']).first() or models.GroupCannedMessage(group_preferences=kwargs['group_preferences']) ) kwargs['form'] = forms.CannedMessage(instance=kwargs['canned_message']) kwargs['deletion_allowed'] = not kwargs['canned_message']._state.adding kwargs['redirect_on_submit'] = kwargs['previous'] return super()._putForms(**kwargs) @ method_decorator(RequireTelegramLogin, 'dispatch') class ManageGroupCannedMessagesView(ManageGroupGenericView): template_name = 'manage_group_cannedmessages.haml' def get_context_data(self, **kwargs): kwargs['previous'] = resolve_url('manage_group', kwargs['group_id']) return super().get_context_data(**kwargs)