#!/usr/bin/env python3 # -*- encoding: utf-8 -*- from .mwt import MWT import telegram import telegram.ext from telegram.ext import Updater from telegram.ext import CommandHandler import logging from django.utils import timezone import random import datetime Group: 'papagaio.models.Group' = None ParrotMessage: 'papagaio.models.ParrotMessage' = None @MWT(timeout=60*5) def get_admin_ids(bot, chat_id): """Returns a list of admin IDs for a given chat. Results are cached for 1 hour.""" return [admin.user.id for admin in bot.get_chat_administrators(chat_id)] def callback_repeating(context: telegram.ext.CallbackContext): for group in Group.objects.filter(next_activation__lte=timezone.now(), enabled=True).all(): choices = list(group.messages.all()) if len(choices) > 0: pmsg = random.choice(choices) try: context.bot.send_message(chat_id=group.chat_id, text=pmsg.message) except telegram.error.Unauthorized: group.enabled = False finally: group.next_activation = timezone.now() + datetime.timedelta( seconds=group.interval ) group.save() def start(update: telegram.Update, context: telegram.ext.CallbackContext): chattype = update.message.chat.type if chattype in ['private']: context.bot.send_message( chat_id=update.message.chat_id, text="Invite me to your server!" ) elif chattype in ['group', 'supergroup']: admins = get_admin_ids(context.bot, update.message.chat_id) if update.message.from_user.id in admins: is_group_admin(update, context) context.bot.send_message( chat_id=update.message.chat_id, text="I'm here!\nTry using my services with /msglst, /msgadd, /msgrm, and /msgfreq" ) def is_group_admin(update: telegram.Update, context: telegram.ext.CallbackContext): if update.message.chat.type in ['group', 'supergroup']: admins = get_admin_ids(context.bot, update.message.chat_id) if update.message.from_user.id in admins: group = Group.objects.filter( chat_id=update.message.chat_id).first() if group is None: group = Group( chat_id=update.message.chat_id, name=update.message.chat.title, enabled=True, interval=600 ) group.save() else: if group.name != update.message.chat.title: group.name = update.message.chat.title group.save() if not group.enabled: group.enabled = True group.save() return group return False def cmd_msglst(update: telegram.Update, context: telegram.ext.CallbackContext): group: 'papagaio.models.Group' = is_group_admin(update, context) if group: msgs = list(group.messages.all()) msgs_opl = "\n".join(list(map(str, msgs))) reply = f'You told me {len(msgs)} phrases to repeat:\n\n{msgs_opl}' context.bot.send_message( chat_id=update.message.chat_id, text=reply ) def cmd_msgadd(update: telegram.Update, context: telegram.ext.CallbackContext): group: 'papagaio.models.Group' = is_group_admin(update, context) if group: body = update.message.text.split(' ', 1)[-1].strip() if body.startswith('/'): context.bot.send_message( chat_id=update.message.chat_id, text="This command should be used like this:\n" + "/msgadd No ones needs fursuit to attend any convention" ) else: pmsg = ParrotMessage(group=group, message=body) pmsg.save() context.bot.send_message( chat_id=update.message.chat_id, text=f'Added:\n{pmsg}' ) def cmd_msgrm(update: telegram.Update, context: telegram.ext.CallbackContext): group: 'papagaio.models.Group' = is_group_admin(update, context) if group: body = update.message.text.split(' ', 1)[-1].strip() if body.startswith('/'): context.bot.send_message( chat_id=update.message.chat_id, text="This command should be used like this:\n/msgrm " ) else: msgid = 0 try: msgid = int(body) except: pass if msgid > 0: pmsg = group.messages.filter(pk=msgid).first() if pmsg is None: context.bot.send_message( chat_id=update.message.chat_id, text=f'The message #{msgid} was not found' ) else: context.bot.send_message( chat_id=update.message.chat_id, text=f'The message #{msgid} was successfully deleted:\n{pmsg}' ) pmsg.delete() else: context.bot.send_message( chat_id=update.message.chat_id, text='The message ID should be a positive number' ) def cmd_msgfreq(update: telegram.Update, context: telegram.ext.CallbackContext): group: 'papagaio.models.Group' = is_group_admin(update, context) if group: print(group) body = update.message.text.split(' ', 1)[-1].strip() ogi = group.interval if body.startswith('/'): context.bot.send_message( chat_id=update.message.chat_id, text=f"Current message frequency: {ogi} seconds\n" + "Use \"/msgfreq \" to update this behavior." ) else: newfreq = 0 try: newfreq = int(body) except: pass if newfreq < 15 or newfreq > 21600: context.bot.send_message( chat_id=update.message.chat_id, text=f'The frequency should be a number between 15 and 21600 (15 seconds and 6 hours).\n' + 'Nothing has been updated.' ) else: group.interval = newfreq group.save() context.bot.send_message( chat_id=update.message.chat_id, text=f'The message frequency has been updated to one every {group.interval} seconds' ) def start_bot(token): global Group global ParrotMessage from .models import Group from .models import ParrotMessage logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger() # logger.setLevel(logging.INFO) u = Updater(token, use_context=True) d = u.dispatcher d.add_handler(CommandHandler('start', start)) d.add_handler(CommandHandler('msglst', cmd_msglst)) d.add_handler(CommandHandler('msgadd', cmd_msgadd)) d.add_handler(CommandHandler('msgrm', cmd_msgrm)) d.add_handler(CommandHandler('msgfreq', cmd_msgfreq)) j = u.job_queue j.run_repeating(callback_repeating, interval=15, first=0) u.start_polling()