404 lines
24 KiB
Python
404 lines
24 KiB
Python
import datetime
|
|
import random
|
|
import traceback
|
|
import html
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import django.utils.timezone
|
|
import requests
|
|
from django.conf import settings
|
|
from django.urls import reverse
|
|
from telegram import ChatMember
|
|
from telegram.bot import Bot
|
|
from telegram.error import BadRequest as TelegramBadRequest
|
|
from telegram.error import Unauthorized as TelegramUnauthorized
|
|
from telegram.parsemode import ParseMode
|
|
|
|
from ..captcher.generator import ImageCaptchaSubprocessGenerator
|
|
from ..mwt import MWT
|
|
from . import models
|
|
|
|
imageCaptcha = ImageCaptchaSubprocessGenerator()
|
|
|
|
|
|
def getTelegramUser(obj: Dict[str, Any]) -> models.TelegramUser:
|
|
telegramUser = models.TelegramUser.objects.filter(
|
|
telegram_id=obj['id']).first()
|
|
if telegramUser is None:
|
|
telegramUser = models.TelegramUser()
|
|
telegramUser.telegram_id = obj['id']
|
|
telegramUser.telegram_first_name = obj['first_name']
|
|
telegramUser.telegram_last_name = obj.get('last_name', None)
|
|
telegramUser.telegram_username = obj.get('username', None)
|
|
telegramUser.save()
|
|
return telegramUser
|
|
|
|
|
|
def getTelegramGroup(obj: Dict[str, Any]) -> models.TelegramGroup:
|
|
telegramGroup = models.TelegramGroup.objects.filter(
|
|
telegram_id=obj['id']).first()
|
|
if telegramGroup is None:
|
|
telegramGroup = models.TelegramGroup()
|
|
telegramGroup.telegram_id = obj['id']
|
|
telegramGroup.telegram_title = obj['title']
|
|
telegramGroup.save()
|
|
return telegramGroup
|
|
|
|
|
|
def get_bot_self_uncached(token: str) -> int:
|
|
tg_me = Bot(token).get_me()
|
|
getTelegramUser(tg_me.__dict__)
|
|
tg_bt = models.TelegramUserBots.objects.filter(token=token).first()
|
|
if tg_bt is not None:
|
|
tg_bt.name = f'{tg_me.first_name} {tg_me.first_name or ""}'.strip()
|
|
tg_bt.telegram_username = tg_me.username
|
|
tg_bt.telegram_id = tg_me.id
|
|
tg_bt.save()
|
|
return tg_me.id
|
|
|
|
|
|
get_bot_self = MWT(timeout=60)(get_bot_self_uncached)
|
|
|
|
|
|
@MWT(timeout=60)
|
|
def get_admins(bot: Bot, chat_id: int) -> Tuple[List[int], Optional[int]]:
|
|
url = '{}/getChatAdministrators'.format(bot.base_url)
|
|
result = bot._request.post(url, {'chat_id': chat_id}, timeout=None)
|
|
lst = list()
|
|
try:
|
|
lst = result
|
|
except BaseException as e:
|
|
raise TelegramUnauthorized(e)
|
|
chat_owner = None
|
|
chat_admins: List[ChatMember] = list()
|
|
for obj in lst:
|
|
member = ChatMember.de_json(obj, bot)
|
|
chat_admins.append(member)
|
|
if obj['status'] == 'creator':
|
|
chat_owner = int(member.user.id)
|
|
return (
|
|
[int(getTelegramUser(admin.user.__dict__).telegram_id)
|
|
for admin in chat_admins],
|
|
chat_owner
|
|
)
|
|
|
|
|
|
def ensure_group_owner_is_set_correctly(group: models.TelegramGroup, owner_id: int) -> None:
|
|
owner = models.TelegramUser.objects.filter(telegram_id=owner_id).first()
|
|
if owner is not None:
|
|
if group.owner != owner:
|
|
group.owner = owner
|
|
group.save()
|
|
owner.save()
|
|
|
|
|
|
def run_telebot(telegram, update, request):
|
|
chatIsFromBot: bool = update.get('message', {}).get(
|
|
'from', {}).get('is_bot', True)
|
|
chatMessageId: id = update.get('message', {}).get('message_id', 0)
|
|
chatMessage: str = update.get('message', {}).get('text', '')
|
|
chatType: str = update.get('message', {}).get(
|
|
'chat', {}).get('type', '')
|
|
chatId: int = update.get('message', {}).get('chat', {}).get('id', 0)
|
|
fromId: int = update.get('message', {}).get('from', {}).get('id', 0)
|
|
newChatMembers: List[Dict[str, Any]] = update.get(
|
|
'message', {}).get('new_chat_members', [])
|
|
try:
|
|
if not chatIsFromBot:
|
|
registeredBot = models.TelegramUserBots.objects.filter(
|
|
token=telegram).first()
|
|
if registeredBot is None and telegram != settings.TELEGRAM_TOKEN:
|
|
Bot(telegram).set_webhook(url='')
|
|
elif chatType == 'channel':
|
|
Bot(telegram).leave_chat(chat_id=chatId)
|
|
else:
|
|
myId = get_bot_self(telegram)
|
|
botUser = models.TelegramUser.objects.get(telegram_id=myId)
|
|
bot = Bot(telegram)
|
|
telegramUser = getTelegramUser(
|
|
update.get('message', {}).get('from', {}))
|
|
if chatType == 'private':
|
|
if registeredBot is None:
|
|
if chatMessage == '/login':
|
|
loginHash = models.LoginHash(
|
|
telegram_user=telegramUser)
|
|
loginHash.save()
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=f'Do NOT share this link with anyone else:\n{request.build_absolute_uri(reverse("login", args=(loginHash.pk, loginHash.uuid)))}',
|
|
disable_web_page_preview=True
|
|
)
|
|
elif chatMessage in ('/start', '/help'):
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text='I am a bot that is able to provide the following services to your group:\n' +
|
|
' - Anti-Spam through CAS (https://cas.chat)\n' +
|
|
' - Anti-Spam through CAPTCHA\n' +
|
|
' - Join/Leave messages\n' +
|
|
' - Canned responses\n' +
|
|
'\n' +
|
|
f'I can only be managed from https://{settings.FIRST_DOMAIN}\n' +
|
|
' - Here, you can only /login or ask me for /help.\n'
|
|
'\n' +
|
|
'You can run your custom-branded bot for free*.\n' +
|
|
' * might change in the future' +
|
|
''
|
|
)
|
|
elif chatMessage == '/previewCAPTCHA':
|
|
secret = ''
|
|
while len(secret) < 7:
|
|
secret += str(random.randint(0, 9))
|
|
imbytes = imageCaptcha.generate(secret)
|
|
bot.send_photo(
|
|
chat_id=chatId,
|
|
photo=imbytes,
|
|
caption=f'Sample CAPTCHA for secret "{secret}"'
|
|
)
|
|
else:
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=f'Sorry, I could not understand you.\nTry /help.'
|
|
)
|
|
else:
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=f'I am a custom bot configured at https://{settings.FIRST_DOMAIN}.\nCheck @{settings.BOT_NAME} for more details.',
|
|
)
|
|
elif chatType in ('group', 'supergroup'):
|
|
chatAdminsTelegramIds, chatOwnerId = get_admins(
|
|
bot, chatId)
|
|
telegramGroup = getTelegramGroup(
|
|
update.get('message', {}).get('chat', {}))
|
|
telegramGroup.admins.set(models.TelegramUser.objects.filter(
|
|
telegram_id__in=chatAdminsTelegramIds).all())
|
|
telegramGroup.save()
|
|
if chatOwnerId is not None:
|
|
ensure_group_owner_is_set_correctly(
|
|
telegramGroup, chatOwnerId)
|
|
leftMemberUpdate = update.get(
|
|
'message', {}).get('left_chat_member', None)
|
|
telegramGroupPreferences = models.GroupPreferences.objects.filter(
|
|
group=telegramGroup).first()
|
|
if myId in chatAdminsTelegramIds:
|
|
if telegramGroupPreferences is not None:
|
|
def send_hello(member):
|
|
join_message = (telegramGroupPreferences.join_message
|
|
.replace('{NAME}', html.escape(member.name))
|
|
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', '')
|
|
.strip())[:4000]
|
|
if len(join_message) > 0:
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=join_message,
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
if len(newChatMembers) > 0:
|
|
for newChatMember in newChatMembers:
|
|
telegramMember = getTelegramUser(
|
|
newChatMember)
|
|
if telegramGroupPreferences.combot_anti_spam_use:
|
|
response = requests.get(
|
|
f'https://api.cas.chat/check?user_id={telegramMember.telegram_id}')
|
|
jsonResponse = None if response is None else response.json()
|
|
banned = None if jsonResponse is None else jsonResponse.get(
|
|
'ok', False)
|
|
if banned:
|
|
bot.kick_chat_member(
|
|
chat_id=chatId,
|
|
user_id=telegramMember.telegram_id,
|
|
)
|
|
if chatType == 'supergroup':
|
|
bot.unban_chat_member(
|
|
chat_id=chatId,
|
|
user_id=telegramMember.telegram_id,
|
|
)
|
|
banned_message = (telegramGroupPreferences.combot_anti_spam_notification
|
|
.replace('{NAME}', html.escape(telegramMember.name))
|
|
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', '')
|
|
.strip())[:4000]
|
|
if len(banned_message):
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=banned_message,
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
continue
|
|
if telegramGroupPreferences.captcha_use:
|
|
secret = ''
|
|
while len(secret) < telegramGroupPreferences.captcha_digits:
|
|
secret += random.choice(
|
|
telegramGroupPreferences.captcha_chars)
|
|
models.PendingCaptchaUser.objects.filter(
|
|
group=telegramGroup, user=telegramMember).delete()
|
|
pcu = models.PendingCaptchaUser(
|
|
bot_token=telegram,
|
|
user=telegramMember,
|
|
group=telegramGroup,
|
|
captcha_answer=secret,
|
|
attempts_left=telegramGroupPreferences.captcha_attempts,
|
|
lifetime=telegramGroupPreferences.captcha_timeout,
|
|
captcha_message_id=0,
|
|
)
|
|
pcu.save()
|
|
imbytes = imageCaptcha.generate(
|
|
secret)
|
|
captcha_message = (telegramGroupPreferences.captcha_first_message
|
|
.replace('{NAME}', html.escape(telegramMember.name))
|
|
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
|
|
.replace('{TIMEOUT}', str(telegramGroupPreferences.captcha_timeout))
|
|
.replace('{ATTEMPTS}', str(telegramGroupPreferences.captcha_attempts))
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', '')
|
|
.strip())[:1000]
|
|
if len(captcha_message) <= 0:
|
|
captcha_message = (f'Hello, {telegramMember.name}!\n' +
|
|
f'To prevent automated spam on {telegramGroup.telegram_title}, we require every new user to solve at CAPTCHA\n' +
|
|
f'You have {telegramGroupPreferences.captcha_timeout} seconds or {telegramGroupPreferences.captcha_attempts} failed attempts before getting kicked.' +
|
|
'')
|
|
sent_message = bot.send_photo(
|
|
chat_id=chatId,
|
|
photo=imbytes,
|
|
caption=captcha_message,
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
pcu.captcha_message_id = sent_message.message_id
|
|
pcu.save()
|
|
if not telegramGroupPreferences.captcha_leave_mess:
|
|
try:
|
|
bot.delete_message(
|
|
chat_id=chatId,
|
|
message_id=chatMessageId,
|
|
)
|
|
except:
|
|
pass
|
|
else:
|
|
send_hello(telegramMember)
|
|
else:
|
|
pendingCaptcha = models.PendingCaptchaUser.objects.filter(
|
|
group=telegramGroup, user=telegramUser).first()
|
|
if pendingCaptcha is not None:
|
|
if not telegramGroupPreferences.captcha_leave_mess:
|
|
try:
|
|
bot.delete_message(
|
|
chat_id=chatId,
|
|
message_id=pendingCaptcha.captcha_message_id,
|
|
)
|
|
except:
|
|
pass
|
|
try:
|
|
bot.delete_message(
|
|
chat_id=chatId,
|
|
message_id=chatMessageId,
|
|
)
|
|
except:
|
|
pass
|
|
if leftMemberUpdate is not None:
|
|
pass
|
|
elif chatMessage == pendingCaptcha.captcha_answer:
|
|
pendingCaptcha.delete()
|
|
send_hello(telegramUser)
|
|
else:
|
|
pendingCaptcha.attempts_left -= 1
|
|
td: datetime.timedelta = (pendingCaptcha.created + datetime.timedelta(
|
|
seconds=pendingCaptcha.lifetime)) - django.utils.timezone.now()
|
|
sec_remain = int(
|
|
td.total_seconds())
|
|
if pendingCaptcha.attempts_left > 0 and sec_remain > 0:
|
|
secret = ''
|
|
while len(secret) < telegramGroupPreferences.captcha_digits:
|
|
secret += random.choice(
|
|
telegramGroupPreferences.captcha_chars)
|
|
imbytes = imageCaptcha.generate(
|
|
secret)
|
|
captcha_message = (telegramGroupPreferences.captcha_retry_message
|
|
.replace('{NAME}', html.escape(telegramUser.name))
|
|
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
|
|
.replace('{TIMEOUT}', str(sec_remain))
|
|
.replace('{ATTEMPTS}', str(pendingCaptcha.attempts_left))
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', '')
|
|
.strip())[:1000]
|
|
if len(captcha_message) <= 0:
|
|
captcha_message = (f'Oops... that was not the answer, {telegramUser.name}.\n' +
|
|
f'Let\'s try solving the CAPTCHA again!\n' +
|
|
f'You have {sec_remain} seconds or {pendingCaptcha.attempts_left} failed attempts before getting kicked.' +
|
|
'')
|
|
sent_message = bot.send_photo(
|
|
chat_id=chatId,
|
|
photo=imbytes,
|
|
caption=captcha_message,
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
pendingCaptcha.captcha_answer = secret
|
|
pendingCaptcha.captcha_message_id = sent_message.message_id
|
|
pendingCaptcha.save()
|
|
else:
|
|
try:
|
|
bot.kick_chat_member(
|
|
chat_id=chatId,
|
|
user_id=telegramUser.telegram_id,
|
|
)
|
|
if chatType == 'supergroup':
|
|
bot.unban_chat_member(
|
|
chat_id=chatId,
|
|
user_id=telegramUser.telegram_id,
|
|
)
|
|
except:
|
|
traceback.print_exc()
|
|
pendingCaptcha.delete()
|
|
else:
|
|
for canned_message in telegramGroupPreferences.canned_messages.all():
|
|
if canned_message.listen in chatMessage:
|
|
try:
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
reply_to_message_id=chatMessageId,
|
|
text=canned_message.reply_with
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', ''),
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
except TelegramBadRequest:
|
|
traceback.print_exc()
|
|
if leftMemberUpdate is not None:
|
|
if fromId != myId:
|
|
telegramMember = getTelegramUser(
|
|
leftMemberUpdate)
|
|
leave_message = (telegramGroupPreferences.leave_message
|
|
.replace('{NAME}', html.escape(telegramMember.name))
|
|
.replace('{GROUP}', html.escape(telegramGroup.telegram_title))
|
|
.replace(' ', ' ')
|
|
.replace('<br>', '')
|
|
.replace('<br >', '')
|
|
.replace('<br/>', '')
|
|
.replace('<br />', '')
|
|
.strip())[:4000]
|
|
if len(leave_message) > 0:
|
|
bot.send_message(
|
|
chat_id=chatId,
|
|
text=leave_message,
|
|
parse_mode=ParseMode.HTML,
|
|
)
|
|
except TelegramUnauthorized:
|
|
pass
|