furmeet-bot/webproj/bot/telebot.py

404 lines
24 KiB
Python

import datetime
import random
import traceback
import html
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import django.utils.timezone
import requests
from django.conf import settings
from django.urls import reverse
from telegram import ChatMember
from telegram.bot import Bot
from telegram.error import BadRequest as TelegramBadRequest
from telegram.error import Unauthorized as TelegramUnauthorized
from telegram.parsemode import ParseMode
from ..captcher.generator import ImageCaptchaSubprocessGenerator
from ..mwt import MWT
from . import models
imageCaptcha = ImageCaptchaSubprocessGenerator()
def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser:
telegramUser = models.TelegramUser.objects.filter(
telegram_id=obj['id']).first()
if telegramUser is None:
telegramUser = models.TelegramUser()
telegramUser.telegram_id = obj['id']
telegramUser.telegram_first_name = obj['first_name']
telegramUser.telegram_last_name = obj.get('last_name', None)
telegramUser.telegram_username = obj.get('username', None)
telegramUser.save()
return telegramUser
def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup:
telegramGroup = models.TelegramGroup.objects.filter(
telegram_id=obj['id']).first()
if telegramGroup is None:
telegramGroup = models.TelegramGroup()
telegramGroup.telegram_id = obj['id']
telegramGroup.telegram_title = obj['title']
telegramGroup.save()
return telegramGroup
def get_bot_self_uncached(token: str) -> int:
tg_me = Bot(token).get_me()
getTelegramUser(tg_me.__dict__)
tg_bt = models.TelegramUserBots.objects.filter(token=token).first()
if tg_bt is not None:
tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip()
tg_bt.telegram_username = tg_me.username
tg_bt.telegram_id = tg_me.id
tg_bt.save()
return tg_me.id
get_bot_self = MWT(timeout=60)(get_bot_self_uncached)
@MWT(timeout=60)
def get_admins(bot: Bot, chat_id: int) -> Tuple[List[int], Optional[int]]:
url = '{}/getChatAdministrators'.format(bot.base_url)
result = bot._request.post(url, {'chat_id': chat_id}, timeout=None)
lst = list()
try:
lst = result
except BaseException as e:
raise TelegramUnauthorized(e)
chat_owner = None
chat_admins: List[ChatMember] = list()
for obj in lst:
member = ChatMember.de_json(obj, bot)
chat_admins.append(member)
if obj['status'] == 'creator':
chat_owner = int(member.user.id)
return (
[int(getTelegramUser(admin.user.__dict__).telegram_id)
for admin in chat_admins],
chat_owner
)
def ensure_group_owner_is_set_correctly(group: models.TelegramGroup, owner_id: int) -> None:
owner = models.TelegramUser.objects.filter(telegram_id=owner_id).first()
if owner is not None:
if group.owner != owner:
group.owner = owner
group.save()
owner.save()
def run_telebot(telegram, update, request):
chatIsFromBot: bool = update.get('message', {}).get(
'from', {}).get('is_bot', True)
chatMessageId: id = update.get('message', {}).get('message_id', 0)
chatMessage: str = update.get('message', {}).get('text', '')
chatType: str = update.get('message', {}).get(
'chat', {}).get('type', '')
chatId: int = update.get('message', {}).get('chat', {}).get('id', 0)
fromId: int = update.get('message', {}).get('from', {}).get('id', 0)
newChatMembers: List[Dict[str, Any]] = update.get(
'message', {}).get('new_chat_members', [])
try:
if not chatIsFromBot:
registeredBot = models.TelegramUserBots.objects.filter(
token=telegram).first()
if registeredBot is None and telegram != settings.TELEGRAM_TOKEN:
Bot(telegram).set_webhook(url='')
elif chatType == 'channel':
Bot(telegram).leave_chat(chat_id=chatId)
else:
myId = get_bot_self(telegram)
botUser = models.TelegramUser.objects.get(telegram_id=myId)
bot = Bot(telegram)
telegramUser = getTelegramUser(
update.get('message', {}).get('from', {}))
if chatType == 'private':
if registeredBot is None:
if chatMessage == '/login':
loginHash = models.LoginHash(
telegram_user=telegramUser)
loginHash.save()
bot.send_message(
chat_id=chatId,
text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}',
disable_web_page_preview=True
)
elif chatMessage in ('/start', '/help'):
bot.send_message(
chat_id=chatId,
text='I am a bot that is able to provide the following services to your group:\n' +
' - Anti-Spam through CAS (https://cas.chat)\n' +
' - Anti-Spam through CAPTCHA\n' +
' - Join/Leave messages\n' +
' - Canned responses\n' +
'\n' +
f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' +
' - Here, you can only /login or ask me for /help.\n'
'\n' +
'You can run your custom-branded bot for free*.\n' +
' * might change in the future' +
''
)
elif chatMessage == '/previewCAPTCHA':
secret = ''
while len(secret) < 7:
secret += str(random.randint(0, 9))
imbytes = imageCaptcha.generate(secret)
bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=f'Sample CAPTCHA for secret "{secret}"'
)
else:
bot.send_message(
chat_id=chatId,
text=f'Sorry, I could not understand you.\nTry /help.'
)
else:
bot.send_message(
chat_id=chatId,
text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.',
)
elif chatType in ('group', 'supergroup'):
chatAdminsTelegramIds, chatOwnerId = get_admins(
bot, chatId)
telegramGroup = getTelegramGroup(
update.get('message', {}).get('chat', {}))
telegramGroup.admins.set(models.TelegramUser.objects.filter(
telegram_id__in=chatAdminsTelegramIds).all())
telegramGroup.save()
if chatOwnerId is not None:
ensure_group_owner_is_set_correctly(
telegramGroup, chatOwnerId)
leftMemberUpdate = update.get(
'message', {}).get('left_chat_member', None)
telegramGroupPreferences = models.GroupPreferences.objects.filter(
group=telegramGroup).first()
if myId in chatAdminsTelegramIds:
if telegramGroupPreferences is not None:
def send_hello(member):
join_message = (telegramGroupPreferences.join_message
.replace('{NAME}', html.escape(member.name))
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
.strip())[:4000]
if len(join_message) > 0:
bot.send_message(
chat_id=chatId,
text=join_message,
parse_mode=ParseMode.HTML,
)
if len(newChatMembers) > 0:
for newChatMember in newChatMembers:
telegramMember = getTelegramUser(
newChatMember)
if telegramGroupPreferences.combot_anti_spam_use:
response = requests.get(
f'https://api.cas.chat/check?user_id={telegramMember.telegram_id}')
jsonResponse = None if response is None else response.json()
banned = None if jsonResponse is None else jsonResponse.get(
'ok', False)
if banned:
bot.kick_chat_member(
chat_id=chatId,
user_id=telegramMember.telegram_id,
)
if chatType == 'supergroup':
bot.unban_chat_member(
chat_id=chatId,
user_id=telegramMember.telegram_id,
)
banned_message = (telegramGroupPreferences.combot_anti_spam_notification
.replace('{NAME}', html.escape(telegramMember.name))
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
.strip())[:4000]
if len(banned_message):
bot.send_message(
chat_id=chatId,
text=banned_message,
parse_mode=ParseMode.HTML,
)
continue
if telegramGroupPreferences.captcha_use:
secret = ''
while len(secret) < telegramGroupPreferences.captcha_digits:
secret += random.choice(
telegramGroupPreferences.captcha_chars)
models.PendingCaptchaUser.objects.filter(
group=telegramGroup, user=telegramMember).delete()
pcu = models.PendingCaptchaUser(
bot_token=telegram,
user=telegramMember,
group=telegramGroup,
captcha_answer=secret,
attempts_left=telegramGroupPreferences.captcha_attempts,
lifetime=telegramGroupPreferences.captcha_timeout,
captcha_message_id=0,
)
pcu.save()
imbytes = imageCaptcha.generate(
secret)
captcha_message = (telegramGroupPreferences.captcha_first_message
.replace('{NAME}', html.escape(telegramMember.name))
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
.replace('{TIMEOUT}', str(telegramGroupPreferences.captcha_timeout))
.replace('{ATTEMPTS}', str(telegramGroupPreferences.captcha_attempts))
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
.strip())[:1000]
if len(captcha_message) <= 0:
captcha_message = (f'Hello, {telegramMember.name}!\n' +
f'To prevent automated spam on {telegramGroup.telegram_title}, we require every new user to solve at CAPTCHA\n' +
f'You have {telegramGroupPreferences.captcha_timeout} seconds or {telegramGroupPreferences.captcha_attempts} failed attempts before getting kicked.' +
'')
sent_message = bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=captcha_message,
parse_mode=ParseMode.HTML,
)
pcu.captcha_message_id = sent_message.message_id
pcu.save()
if not telegramGroupPreferences.captcha_leave_mess:
try:
bot.delete_message(
chat_id=chatId,
message_id=chatMessageId,
)
except:
pass
else:
send_hello(telegramMember)
else:
pendingCaptcha = models.PendingCaptchaUser.objects.filter(
group=telegramGroup, user=telegramUser).first()
if pendingCaptcha is not None:
if not telegramGroupPreferences.captcha_leave_mess:
try:
bot.delete_message(
chat_id=chatId,
message_id=pendingCaptcha.captcha_message_id,
)
except:
pass
try:
bot.delete_message(
chat_id=chatId,
message_id=chatMessageId,
)
except:
pass
if leftMemberUpdate is not None:
pass
elif chatMessage == pendingCaptcha.captcha_answer:
pendingCaptcha.delete()
send_hello(telegramUser)
else:
pendingCaptcha.attempts_left -= 1
td: datetime.timedelta = (pendingCaptcha.created + datetime.timedelta(
seconds=pendingCaptcha.lifetime)) - django.utils.timezone.now()
sec_remain = int(
td.total_seconds())
if pendingCaptcha.attempts_left > 0 and sec_remain > 0:
secret = ''
while len(secret) < telegramGroupPreferences.captcha_digits:
secret += random.choice(
telegramGroupPreferences.captcha_chars)
imbytes = imageCaptcha.generate(
secret)
captcha_message = (telegramGroupPreferences.captcha_retry_message
.replace('{NAME}', html.escape(telegramUser.name))
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
.replace('{TIMEOUT}', str(sec_remain))
.replace('{ATTEMPTS}', str(pendingCaptcha.attempts_left))
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
.strip())[:1000]
if len(captcha_message) <= 0:
captcha_message = (f'Oops... that was not the answer, {telegramUser.name}.\n' +
f'Let\'s try solving the CAPTCHA again!\n' +
f'You have {sec_remain} seconds or {pendingCaptcha.attempts_left} failed attempts before getting kicked.' +
'')
sent_message = bot.send_photo(
chat_id=chatId,
photo=imbytes,
caption=captcha_message,
parse_mode=ParseMode.HTML,
)
pendingCaptcha.captcha_answer = secret
pendingCaptcha.captcha_message_id = sent_message.message_id
pendingCaptcha.save()
else:
try:
bot.kick_chat_member(
chat_id=chatId,
user_id=telegramUser.telegram_id,
)
if chatType == 'supergroup':
bot.unban_chat_member(
chat_id=chatId,
user_id=telegramUser.telegram_id,
)
except:
traceback.print_exc()
pendingCaptcha.delete()
else:
for canned_message in telegramGroupPreferences.canned_messages.all():
if canned_message.listen in chatMessage:
try:
bot.send_message(
chat_id=chatId,
reply_to_message_id=chatMessageId,
text=canned_message.reply_with
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', ''),
parse_mode=ParseMode.HTML,
)
except TelegramBadRequest:
traceback.print_exc()
if leftMemberUpdate is not None:
if fromId != myId:
telegramMember = getTelegramUser(
leftMemberUpdate)
leave_message = (telegramGroupPreferences.leave_message
.replace('{NAME}', html.escape(telegramMember.name))
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
.replace('&nbsp;', ' ')
.replace('<br>', '')
.replace('<br >', '')
.replace('<br/>', '')
.replace('<br />', '')
.strip())[:4000]
if len(leave_message) > 0:
bot.send_message(
chat_id=chatId,
text=leave_message,
parse_mode=ParseMode.HTML,
)
except TelegramUnauthorized:
pass