import datetime import random import traceback from pathlib import Path from typing import Any, Dict, List, Tuple, Optional import requests import django.utils.timezone from captcha.image import ImageCaptcha from django.conf import settings from django.urls import reverse from telegram.bot import Bot from telegram.error import BadRequest as TelegramBadRequest from telegram.error import Unauthorized as TelegramUnauthorized from telegram.parsemode import ParseMode from telegram import ChatMember from ..mwt import MWT from . import models imageCaptchaFonts = [ str(Path('webproj/fonts/Roboto/Roboto-Regular.ttf').absolute()), str(Path('webproj/fonts/Fondamento/Fondamento-Regular.ttf').absolute()), str(Path('webproj/fonts/Josefin_Sans/static/JosefinSans-Medium.ttf').absolute()), str(Path('webproj/fonts/Lora/static/Lora-Medium.ttf').absolute()), str(Path('webproj/fonts/Quicksand/static/Quicksand-Regular.ttf').absolute()), str(Path('webproj/fonts/Work_Sans/static/WorkSans-Medium.ttf').absolute()), ] imageCaptcha = ImageCaptcha(300, 150, fonts=imageCaptchaFonts, font_sizes=[ 36, 40, 44, 48, 54, 60, 66, 72]) for truefont in imageCaptcha.truefonts: pass def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser: telegramUser = models.TelegramUser.objects.filter( telegram_id=obj['id']).first() if telegramUser is None: telegramUser = models.TelegramUser() telegramUser.telegram_id = obj['id'] telegramUser.telegram_first_name = obj['first_name'] telegramUser.telegram_last_name = obj.get('last_name', None) telegramUser.telegram_username = obj.get('username', None) telegramUser.save() return telegramUser def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup: telegramGroup = models.TelegramGroup.objects.filter( telegram_id=obj['id']).first() if telegramGroup is None: telegramGroup = models.TelegramGroup() telegramGroup.telegram_id = obj['id'] telegramGroup.telegram_title = obj['title'] telegramGroup.save() return telegramGroup def get_bot_self_uncached(token: str) -> int: tg_me = Bot(token).get_me() getTelegramUser(tg_me.__dict__) tg_bt = models.TelegramUserBots.objects.filter(token=token).first() if tg_bt is not None: tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip() tg_bt.telegram_username = tg_me.username tg_bt.telegram_id = tg_me.id tg_bt.save() return tg_me.id get_bot_self = MWT(timeout=60)(get_bot_self_uncached) @MWT(timeout=60) def get_admins(bot: Bot, chat_id: int) -> Tuple[List[int], Optional[int]]: url = '{}/getChatAdministrators'.format(bot.base_url) result = bot._request.post(url, {'chat_id': chat_id}, timeout=None) lst = list() try: lst = result except BaseException as e: raise TelegramUnauthorized(e) chat_owner = None chat_admins: List[ChatMember] = list() for obj in lst: member = ChatMember.de_json(obj, bot) chat_admins.append(member) if obj['status'] == 'creator': chat_owner = int(member.user.id) return ( [int(getTelegramUser(admin.user.__dict__).telegram_id) for admin in chat_admins], chat_owner ) def ensure_group_owner_is_set_correctly(group: models.TelegramGroup, owner_id: int) -> None: owner = models.TelegramUser.objects.filter(telegram_id=owner_id).first() if owner is not None: if group.owner != owner: group.owner = owner group.save() owner.save() def run_telebot(telegram, update, request): chatIsFromBot: bool = update.get('message', {}).get( 'from', {}).get('is_bot', True) chatMessageId: id = update.get('message', {}).get('message_id', 0) chatMessage: str = update.get('message', {}).get('text', '') chatType: str = update.get('message', {}).get( 'chat', {}).get('type', '') chatId: int = update.get('message', {}).get('chat', {}).get('id', 0) fromId: int = update.get('message', {}).get('from', {}).get('id', 0) newChatMembers: List[Dict[str, Any]] = update.get( 'message', {}).get('new_chat_members', []) try: if not chatIsFromBot: registeredBot = models.TelegramUserBots.objects.filter( token=telegram).first() if registeredBot is None and telegram != settings.TELEGRAM_TOKEN: Bot(telegram).set_webhook(url='') elif chatType == 'channel': Bot(telegram).leave_chat(chat_id=chatId) else: myId = get_bot_self(telegram) botUser = models.TelegramUser.objects.get(telegram_id=myId) bot = Bot(telegram) telegramUser = getTelegramUser( update.get('message', {}).get('from', {})) if chatType == 'private': if registeredBot is None: if chatMessage == '/login': loginHash = models.LoginHash( telegram_user=telegramUser) loginHash.save() bot.send_message( chat_id=chatId, text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}', disable_web_page_preview=True ) elif chatMessage in ('/start', '/help'): bot.send_message( chat_id=chatId, text='I am a bot that is able to provide the following services to your group:\n' + ' - Anti-Spam through CAS (https://cas.chat)\n' + ' - Anti-Spam through CAPTCHA\n' + ' - Join/Leave messages\n' + ' - Canned responses\n' + '\n' + f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' + ' - Here, you can only /login or ask me for /help.\n' '\n' + 'You can run your custom-branded bot for free*.\n' + ' * might change in the future' + '' ) elif chatMessage == '/previewCAPTCHA': secret = '' while len(secret) < 7: secret += str(random.randint(0, 9)) imbytes = imageCaptcha.generate(secret) bot.send_photo( chat_id=chatId, photo=imbytes, caption=f'Sample CAPTCHA for secret "{secret}"' ) else: bot.send_message( chat_id=chatId, text=f'Sorry, I could not understand you.\nTry /help.' ) else: bot.send_message( chat_id=chatId, text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.', ) elif chatType in ('group', 'supergroup'): chatAdminsTelegramIds, chatOwnerId = get_admins(bot, chatId) telegramGroup = getTelegramGroup( update.get('message', {}).get('chat', {})) telegramGroup.admins.set(models.TelegramUser.objects.filter( telegram_id__in=chatAdminsTelegramIds).all()) telegramGroup.save() if chatOwnerId is not None: ensure_group_owner_is_set_correctly(telegramGroup, chatOwnerId) leftMemberUpdate = update.get( 'message', {}).get('left_chat_member', None) telegramGroupPreferences = models.GroupPreferences.objects.filter( group=telegramGroup).first() if myId in chatAdminsTelegramIds: if telegramGroupPreferences is not None: def send_hello(member): join_message = (telegramGroupPreferences.join_message .replace('{NAME}', member.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', '') .strip())[:4000] if len(join_message) > 0: bot.send_message( chat_id=chatId, text=join_message, parse_mode=ParseMode.HTML, ) if len(newChatMembers) > 0: for newChatMember in newChatMembers: telegramMember = getTelegramUser( newChatMember) if telegramGroupPreferences.combot_anti_spam_use: response = requests.get( f'https://api.cas.chat/check?user_id={telegramMember.telegram_id}') jsonResponse = None if response is None else response.json() banned = None if jsonResponse is None else jsonResponse.get( 'ok', False) if banned: bot.kick_chat_member( chat_id=chatId, user_id=telegramMember.telegram_id, ) if chatType == 'supergroup': bot.unban_chat_member( chat_id=chatId, user_id=telegramMember.telegram_id, ) banned_message = (telegramGroupPreferences.combot_anti_spam_notification .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', '') .strip())[:4000] if len(banned_message): bot.send_message( chat_id=chatId, text=banned_message, parse_mode=ParseMode.HTML, ) continue if telegramGroupPreferences.captcha_use: secret = '' while len(secret) < telegramGroupPreferences.captcha_digits: secret += random.choice( telegramGroupPreferences.captcha_chars) models.PendingCaptchaUser.objects.filter( group=telegramGroup, user=telegramMember).delete() pcu = models.PendingCaptchaUser( bot_token=telegram, user=telegramMember, group=telegramGroup, captcha_answer=secret, attempts_left=telegramGroupPreferences.captcha_attempts, lifetime=telegramGroupPreferences.captcha_timeout, captcha_message_id=0, ) pcu.save() imbytes = imageCaptcha.generate( secret) captcha_message = (telegramGroupPreferences.captcha_first_message .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace('{TIMEOUT}', str(telegramGroupPreferences.captcha_timeout)) .replace('{ATTEMPTS}', str(telegramGroupPreferences.captcha_attempts)) .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', '') .strip())[:1000] if len(captcha_message) <= 0: captcha_message = (f'Hello, {telegramMember.name}!\n' + f'To prevent automated spam on {telegramGroup.telegram_title}, we require every new user to solve at CAPTCHA\n' + f'You have {telegramGroupPreferences.captcha_timeout} seconds or {telegramGroupPreferences.captcha_attempts} failed attempts before getting kicked.' + '') sent_message = bot.send_photo( chat_id=chatId, photo=imbytes, caption=captcha_message, parse_mode=ParseMode.HTML, ) pcu.captcha_message_id = sent_message.message_id pcu.save() if not telegramGroupPreferences.captcha_leave_mess: try: bot.delete_message( chat_id=chatId, message_id=chatMessageId, ) except: pass else: send_hello(telegramMember) else: pendingCaptcha = models.PendingCaptchaUser.objects.filter( group=telegramGroup, user=telegramUser).first() if pendingCaptcha is not None: if not telegramGroupPreferences.captcha_leave_mess: try: bot.delete_message( chat_id=chatId, message_id=pendingCaptcha.captcha_message_id, ) except: pass try: bot.delete_message( chat_id=chatId, message_id=chatMessageId, ) except: pass if leftMemberUpdate is not None: pass elif chatMessage == pendingCaptcha.captcha_answer: pendingCaptcha.delete() send_hello(telegramUser) else: pendingCaptcha.attempts_left -= 1 td: datetime.timedelta = (pendingCaptcha.created + datetime.timedelta( seconds=pendingCaptcha.lifetime)) - django.utils.timezone.now() sec_remain = int( td.total_seconds()) if pendingCaptcha.attempts_left > 0 and sec_remain > 0: secret = '' while len(secret) < telegramGroupPreferences.captcha_digits: secret += random.choice( telegramGroupPreferences.captcha_chars) imbytes = imageCaptcha.generate( secret) captcha_message = (telegramGroupPreferences.captcha_retry_message .replace('{NAME}', telegramUser.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace('{TIMEOUT}', str(sec_remain)) .replace('{ATTEMPTS}', str(pendingCaptcha.attempts_left)) .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', '') .strip())[:1000] if len(captcha_message) <= 0: captcha_message = (f'Oops... that was not the answer, {telegramUser.name}.\n' + f'Let\'s try solving the CAPTCHA again!\n' + f'You have {sec_remain} seconds or {pendingCaptcha.attempts_left} failed attempts before getting kicked.' + '') sent_message = bot.send_photo( chat_id=chatId, photo=imbytes, caption=captcha_message, parse_mode=ParseMode.HTML, ) pendingCaptcha.captcha_answer = secret pendingCaptcha.captcha_message_id = sent_message.message_id pendingCaptcha.save() else: try: bot.kick_chat_member( chat_id=chatId, user_id=telegramUser.telegram_id, ) if chatType == 'supergroup': bot.unban_chat_member( chat_id=chatId, user_id=telegramUser.telegram_id, ) except: traceback.print_exc() pendingCaptcha.delete() else: for canned_message in telegramGroupPreferences.canned_messages.all(): if canned_message.listen in chatMessage: try: bot.send_message( chat_id=chatId, reply_to_message_id=chatMessageId, text=canned_message.reply_with .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', ''), parse_mode=ParseMode.HTML, ) except TelegramBadRequest: traceback.print_exc() if leftMemberUpdate is not None: if fromId != myId: telegramMember = getTelegramUser( leftMemberUpdate) leave_message = (telegramGroupPreferences.leave_message .replace('{NAME}', telegramMember.name) .replace('{GROUP}', telegramGroup.telegram_title) .replace(' ', ' ') .replace('
', '') .replace('
', '') .replace('
', '') .replace('
', '') .strip())[:4000] if len(leave_message) > 0: bot.send_message( chat_id=chatId, text=leave_message, parse_mode=ParseMode.HTML, ) except TelegramUnauthorized: pass