telegram---papagaio-fur-bot/papagaio/telebot.py

204 lines
7.4 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
from .mwt import MWT
import telegram
import telegram.ext
from telegram.ext import Updater
from telegram.ext import CommandHandler
import logging
from django.utils import timezone
import random
import datetime
Group: 'papagaio.models.Group' = None
ParrotMessage: 'papagaio.models.ParrotMessage' = None
@MWT(timeout=60*5)
def get_admin_ids(bot, chat_id):
"""Returns a list of admin IDs for a given chat. Results are cached for 1 hour."""
return [admin.user.id for admin in bot.get_chat_administrators(chat_id)]
def callback_repeating(context: telegram.ext.CallbackContext):
for group in Group.objects.filter(next_activation__lte=timezone.now(), enabled=True).all():
choices = list(group.messages.all())
if len(choices) > 0:
pmsg = random.choice(choices)
try:
context.bot.send_message(chat_id=group.chat_id,
text=pmsg.message)
except telegram.error.Unauthorized:
group.enabled = False
finally:
group.next_activation = timezone.now() + datetime.timedelta(
seconds=group.interval
)
group.save()
def start(update: telegram.Update, context: telegram.ext.CallbackContext):
chattype = update.message.chat.type
if chattype in ['private']:
context.bot.send_message(
chat_id=update.message.chat_id,
text="Invite me to your server!"
)
elif chattype in ['group', 'supergroup']:
admins = get_admin_ids(context.bot, update.message.chat_id)
if update.message.from_user.id in admins:
is_group_admin(update, context)
context.bot.send_message(
chat_id=update.message.chat_id,
text="I'm here!\nTry using my services with /msglst, /msgadd, /msgrm, and /msgfreq"
)
def is_group_admin(update: telegram.Update, context: telegram.ext.CallbackContext):
if update.message.chat.type in ['group', 'supergroup']:
admins = get_admin_ids(context.bot, update.message.chat_id)
if update.message.from_user.id in admins:
group = Group.objects.filter(
chat_id=update.message.chat_id).first()
if group is None:
group = Group(
chat_id=update.message.chat_id,
name=update.message.chat.title,
enabled=True,
interval=600
)
group.save()
else:
if group.name != update.message.chat.title:
group.name = update.message.chat.title
group.save()
if not group.enabled:
group.enabled = True
group.save()
return group
return False
def cmd_msglst(update: telegram.Update, context: telegram.ext.CallbackContext):
group: 'papagaio.models.Group' = is_group_admin(update, context)
if group:
msgs = list(group.messages.all())
msgs_opl = "\n".join(list(map(str, msgs)))
reply = f'You told me {len(msgs)} phrases to repeat:\n\n{msgs_opl}'
context.bot.send_message(
chat_id=update.message.chat_id,
text=reply
)
def cmd_msgadd(update: telegram.Update, context: telegram.ext.CallbackContext):
group: 'papagaio.models.Group' = is_group_admin(update, context)
if group:
body = update.message.text.split(' ', 1)[-1].strip()
if body.startswith('/'):
context.bot.send_message(
chat_id=update.message.chat_id,
text="This command should be used like this:\n" +
"/msgadd No ones needs fursuit to attend any convention"
)
else:
pmsg = ParrotMessage(group=group, message=body)
pmsg.save()
context.bot.send_message(
chat_id=update.message.chat_id,
text=f'Added:\n{pmsg}'
)
def cmd_msgrm(update: telegram.Update, context: telegram.ext.CallbackContext):
group: 'papagaio.models.Group' = is_group_admin(update, context)
if group:
body = update.message.text.split(' ', 1)[-1].strip()
if body.startswith('/'):
context.bot.send_message(
chat_id=update.message.chat_id,
text="This command should be used like this:\n/msgrm <message's_number>"
)
else:
msgid = 0
try:
msgid = int(body)
except:
pass
if msgid > 0:
pmsg = group.messages.filter(pk=msgid).first()
if pmsg is None:
context.bot.send_message(
chat_id=update.message.chat_id,
text=f'The message #{msgid} was not found'
)
else:
context.bot.send_message(
chat_id=update.message.chat_id,
text=f'The message #{msgid} was successfully deleted:\n{pmsg}'
)
pmsg.delete()
else:
context.bot.send_message(
chat_id=update.message.chat_id,
text='The message ID should be a positive number'
)
def cmd_msgfreq(update: telegram.Update, context: telegram.ext.CallbackContext):
group: 'papagaio.models.Group' = is_group_admin(update, context)
if group:
print(group)
body = update.message.text.split(' ', 1)[-1].strip()
ogi = group.interval
if body.startswith('/'):
context.bot.send_message(
chat_id=update.message.chat_id,
text=f"Current message frequency: {ogi} seconds\n" +
"Use \"/msgfreq <seconds_between_each_message>\" to update this behavior."
)
else:
newfreq = 0
try:
newfreq = int(body)
except:
pass
if newfreq < 15 or newfreq > 21600:
context.bot.send_message(
chat_id=update.message.chat_id,
text=f'The frequency should be a number between 15 and 21600 (15 seconds and 6 hours).\n' +
'Nothing has been updated.'
)
else:
group.interval = newfreq
group.save()
context.bot.send_message(
chat_id=update.message.chat_id,
text=f'The message frequency has been updated to one every {group.interval} seconds'
)
def start_bot(token):
global Group
global ParrotMessage
from .models import Group
from .models import ParrotMessage
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger()
# logger.setLevel(logging.INFO)
u = Updater(token, use_context=True)
d = u.dispatcher
d.add_handler(CommandHandler('start', start))
d.add_handler(CommandHandler('msglst', cmd_msglst))
d.add_handler(CommandHandler('msgadd', cmd_msgadd))
d.add_handler(CommandHandler('msgrm', cmd_msgrm))
d.add_handler(CommandHandler('msgfreq', cmd_msgfreq))
j = u.job_queue
j.run_repeating(callback_repeating, interval=15, first=0)
u.start_polling()