331 lines
14 KiB
Python
331 lines
14 KiB
Python
import datetime
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from ckeditor.fields import RichTextField
|
|
from django.core.validators import (MaxValueValidator, MinLengthValidator,
|
|
MinValueValidator)
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.db.models.signals import post_delete, pre_delete
|
|
from django.dispatch import receiver
|
|
from django.utils import timezone
|
|
from telegram.bot import Bot
|
|
from timezone_field import TimeZoneField
|
|
from django.conf import settings
|
|
|
|
# Create your models here.
|
|
|
|
|
|
class Timestampable(models.Model):
|
|
AUTO_COLLECT = False
|
|
COLLECTION_TIMEOUT = 3600*24*28
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
modified = models.DateTimeField(auto_now=True)
|
|
uuid = models.UUIDField(
|
|
default=uuid.uuid4, editable=False, primary_key=False)
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def __str__(self):
|
|
return repr(self)
|
|
|
|
def __repr__(self):
|
|
internal_dict_copy = self.__dict__.copy()
|
|
if '_state' in internal_dict_copy:
|
|
del internal_dict_copy['_state']
|
|
return f'{type(self).__name__}(**{repr(internal_dict_copy)})'
|
|
|
|
def on_pre_delete(self, sender, instance, **kwargs):
|
|
pass
|
|
|
|
def on_post_delete(self, sender, instance, **kwargs):
|
|
pass
|
|
|
|
|
|
@receiver(models.signals.post_delete)
|
|
def timestampable_on_post_delete(sender, instance, **kwargs):
|
|
if issubclass(sender, Timestampable):
|
|
instance.on_post_delete(sender, instance, **kwargs)
|
|
|
|
|
|
@receiver(models.signals.pre_delete)
|
|
def timestampable_on_pre_delete(sender, instance, **kwargs):
|
|
if issubclass(sender, Timestampable):
|
|
instance.on_pre_delete(sender, instance, **kwargs)
|
|
|
|
|
|
class TelegramGroup(Timestampable):
|
|
# AUTO_COLLECT = True
|
|
telegram_id = models.BigIntegerField()
|
|
telegram_title = models.CharField(max_length=255)
|
|
owner = models.ForeignKey(to='TelegramUser', on_delete=models.SET_NULL,
|
|
blank=False, null=True, related_name='owns')
|
|
# preferences <- GroupPreferences:group
|
|
# users_with_pending_captcha <- PendingCaptchaUser:group
|
|
# admins <- TelegramUser:admins
|
|
|
|
class Meta:
|
|
ordering = ['telegram_title', 'telegram_id']
|
|
|
|
|
|
class TelegramUser(Timestampable):
|
|
# AUTO_COLLECT = True
|
|
telegram_id = models.BigIntegerField(blank=False, null=False)
|
|
telegram_first_name = models.CharField(
|
|
max_length=255, blank=False, null=False)
|
|
telegram_last_name = models.CharField(
|
|
max_length=255, blank=True, null=True)
|
|
telegram_username = models.CharField(max_length=255, blank=True, null=True)
|
|
admins = models.ManyToManyField(to=TelegramGroup, related_name='admins')
|
|
# login_hashes <- LoginHash:telegram_user
|
|
# pending_captchas <- PendingCaptchaUser:user
|
|
# bots <- TelegramUserBots:owner
|
|
# owns <- TelegramGroup:owner
|
|
|
|
class Meta:
|
|
ordering = ['telegram_first_name', 'telegram_id']
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return f'{self.telegram_first_name} {self.telegram_last_name or ""}'.strip()
|
|
|
|
@property
|
|
def as_bot(self) -> Optional['TelegramUserBots']:
|
|
if self.telegram_username == settings.BOT_NAME:
|
|
return TelegramUserBots(
|
|
name="@" + settings.BOT_NAME,
|
|
telegram_username=settings.BOT_NAME,
|
|
telegram_id=-1,
|
|
token=settings.BOT_TOKEN
|
|
)
|
|
return TelegramUserBots.objects.filter(telegram_id=self.telegram_id).first()
|
|
|
|
@property
|
|
def manages(self) -> models.QuerySet[TelegramGroup]:
|
|
grants_user_has = set(
|
|
(x.group_preferences.group.id for x in self.preferences_access_control_grants.all()))
|
|
return self.admins.filter(
|
|
Q(preferences__access_control_use=False)
|
|
| Q(preferences__access_control_use=True, id__in=grants_user_has)
|
|
| Q(owner=self))
|
|
|
|
|
|
class TelegramUserBots(Timestampable):
|
|
owner = models.ForeignKey(to=TelegramUser, on_delete=models.CASCADE,
|
|
blank=False, null=False, related_name='bots')
|
|
name = models.CharField(max_length=255, blank=False, null=False)
|
|
token = models.CharField(max_length=255, blank=False, null=False)
|
|
telegram_username = models.CharField(
|
|
max_length=255, blank=False, null=False)
|
|
telegram_id = models.BigIntegerField(blank=False, null=False)
|
|
|
|
def on_pre_delete(self, sender, instance, **kwargs):
|
|
if instance.token is not None and instance.token != '':
|
|
Bot(instance.token).set_webhook(url='')
|
|
|
|
@property
|
|
def as_user(self):
|
|
return TelegramUser.objects.get(telegram_id=self.telegram_id)
|
|
|
|
class Meta:
|
|
ordering = ['name', 'telegram_username', 'telegram_id']
|
|
|
|
|
|
class SelfDestructingMessage(Timestampable):
|
|
telegram_bot = models.ForeignKey(to=TelegramUserBots, on_delete=models.CASCADE,
|
|
blank=False, null=False, related_name='self_destructing_messages')
|
|
duration_seconds = models.BigIntegerField(blank=False, null=False)
|
|
chat_id = models.IntegerField(blank=False, null=False)
|
|
message_id = models.IntegerField(blank=False, null=False)
|
|
|
|
|
|
class LoginHash(Timestampable):
|
|
AUTO_COLLECT = True
|
|
COLLECTION_TIMEOUT = 3600
|
|
telegram_user = models.ForeignKey(
|
|
to=TelegramUser, on_delete=models.CASCADE, blank=False, null=False, related_name='login_hashes')
|
|
|
|
class Meta:
|
|
ordering = ['telegram_user']
|
|
|
|
|
|
class GroupPreferences(Timestampable):
|
|
group = models.OneToOneField(to=TelegramGroup, on_delete=models.CASCADE,
|
|
blank=False, null=False, related_name='preferences')
|
|
# Access control
|
|
access_control_use = models.BooleanField(
|
|
default=False, blank=True, null=False)
|
|
# access_control_grants <- GroupAccessControlGrant:group_preferences
|
|
# Anti-Spam
|
|
combot_anti_spam_use = models.BooleanField(
|
|
default=False, blank=True, null=False)
|
|
combot_anti_spam_notification = RichTextField(
|
|
max_length=4000, default='', blank=True, null=False)
|
|
# Greetings
|
|
join_message = RichTextField(
|
|
max_length=4000, default='', blank=True, null=False)
|
|
leave_message = RichTextField(
|
|
max_length=4000, default='', blank=True, null=False)
|
|
# Captcha
|
|
captcha_use = models.BooleanField(default=False, blank=True, null=False)
|
|
captcha_chars = models.CharField(
|
|
max_length=255, default='0123456789', blank=False, null=False, validators=[MinLengthValidator(2)])
|
|
captcha_digits = models.IntegerField(default=7, blank=True, null=False, validators=[
|
|
MinValueValidator(1), MaxValueValidator(24)])
|
|
captcha_timeout = models.IntegerField(default=240, blank=True, null=False, validators=[
|
|
MinValueValidator(15), MaxValueValidator(600)])
|
|
captcha_attempts = models.IntegerField(default=5, blank=True, null=False, validators=[
|
|
MinValueValidator(2), MaxValueValidator(20)])
|
|
captcha_first_message = RichTextField(
|
|
max_length=1000, default='', blank=True, null=False)
|
|
captcha_retry_message = RichTextField(
|
|
max_length=1000, default='', blank=True, null=False)
|
|
captcha_leave_mess = models.BooleanField(
|
|
default=False, blank=True, null=False)
|
|
# Canned Messages
|
|
# canned_messages <- GroupCannedMessage:group_preferences
|
|
# Planned messages
|
|
planned_message_enabled = models.BooleanField(
|
|
default=False, blank=True, null=False)
|
|
planned_message_fails = models.IntegerField(
|
|
default=0, blank=True, null=False)
|
|
planned_message_issuer = models.ForeignKey(
|
|
to=TelegramUserBots, default=None, blank=True, null=True, on_delete=models.SET_NULL)
|
|
planned_message_timezone = TimeZoneField(
|
|
default='UTC', blank=True, null=False)
|
|
# planned_messages <- GroupCannedMessage:group_preferences
|
|
# planned_message_dispatches <- GroupPlannedMessage:group_preferences
|
|
|
|
class Meta:
|
|
ordering = ['group']
|
|
|
|
|
|
class GroupAccessControlGrant(Timestampable):
|
|
group_preferences = models.ForeignKey(
|
|
to=GroupPreferences, on_delete=models.CASCADE, blank=False, null=False, related_name='access_control_grants')
|
|
allows = models.ForeignKey(to=TelegramUser, on_delete=models.CASCADE,
|
|
blank=False, null=False, related_name='preferences_access_control_grants')
|
|
|
|
class Meta:
|
|
ordering = ['group_preferences', 'allows']
|
|
|
|
|
|
class GroupCannedMessage(Timestampable):
|
|
group_preferences = models.ForeignKey(
|
|
to=GroupPreferences, on_delete=models.CASCADE, blank=False, null=False, related_name='canned_messages')
|
|
listen = models.CharField(max_length=255)
|
|
reply_with = RichTextField(max_length=4000)
|
|
|
|
class Meta:
|
|
ordering = ['group_preferences', 'listen', 'reply_with']
|
|
|
|
@property
|
|
def tags_as_list(self):
|
|
return sorted(list(set(filter(len, ','.join(self.tags.split(' ')).split(',')))))
|
|
|
|
|
|
class GroupPlannedMessage(Timestampable):
|
|
group_preferences = models.ForeignKey(
|
|
to=GroupPreferences, on_delete=models.CASCADE, blank=False, null=False, related_name='planned_messages')
|
|
tags = models.TextField(default='', blank=True, null=False)
|
|
message = RichTextField(max_length=4000, default='',
|
|
blank=True, null=False)
|
|
|
|
class Meta:
|
|
ordering = ['group_preferences', 'tags', 'message']
|
|
|
|
@property
|
|
def tags_as_list(self):
|
|
return sorted(list(set(filter(len, ','.join(self.tags.split(' ')).split(',')))))
|
|
|
|
|
|
class GroupPlannedDispatch(Timestampable):
|
|
group_preferences = models.ForeignKey(
|
|
to=GroupPreferences, on_delete=models.CASCADE, blank=False, null=False, related_name='planned_message_dispatches')
|
|
tags = models.TextField(default='', blank=True, null=False)
|
|
day_sunday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_monday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_tuesday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_wednesday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_thursday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_friday = models.BooleanField(default=False, blank=True, null=False)
|
|
day_saturday = models.BooleanField(default=False, blank=True, null=False)
|
|
time_start = models.TimeField(default=datetime.time(
|
|
0, 0, 0), auto_now=False, auto_now_add=False)
|
|
time_end = models.TimeField(default=datetime.time(
|
|
23, 59, 59), auto_now=False, auto_now_add=False)
|
|
time_repeat = models.TimeField(default=datetime.time(
|
|
0, 30, 0), auto_now=False, auto_now_add=False)
|
|
time_last_dispatch = models.DateTimeField(
|
|
default=timezone.now, blank=True, null=False)
|
|
|
|
class Meta:
|
|
ordering = ['group_preferences', '-time_last_dispatch',
|
|
'time_start', 'time_end', 'tags']
|
|
|
|
@property
|
|
def days_of_week(self):
|
|
return (self.day_monday, self.day_tuesday, self.day_wednesday, self.day_thursday, self.day_friday, self.day_saturday, self.day_sunday)
|
|
|
|
@property
|
|
def triggering_times(self):
|
|
repeat_diff = datetime.datetime.combine(
|
|
datetime.datetime.min, self.time_repeat) - datetime.datetime.min
|
|
if self.time_start > self.time_end:
|
|
self.time_start, self.time_end = self.time_end, self.time_start
|
|
self.save()
|
|
times = [self.time_start]
|
|
if repeat_diff.total_seconds() > 0:
|
|
while (last_added_time := times[-1]) < self.time_end:
|
|
new_time_candidate = (datetime.datetime.combine(
|
|
datetime.datetime.min, last_added_time) + repeat_diff).time()
|
|
if new_time_candidate > last_added_time:
|
|
times.append(new_time_candidate)
|
|
else:
|
|
break
|
|
return times
|
|
|
|
@property
|
|
def time_next_dispatch(self):
|
|
if any((days_of_week := self.days_of_week)):
|
|
tzinfo = self.group_preferences.planned_message_timezone
|
|
triggering_times = self.triggering_times
|
|
now = self.time_last_dispatch
|
|
today = datetime.datetime.combine(now, datetime.time(
|
|
hour=0, minute=0, second=0), tzinfo=tzinfo)
|
|
dispatches = []
|
|
for day_index in range(15):
|
|
hypothetical_day = today + datetime.timedelta(days=day_index)
|
|
if days_of_week[hypothetical_day.weekday()]:
|
|
for triggering_time in triggering_times:
|
|
dispatch = datetime.datetime.combine(
|
|
hypothetical_day, triggering_time, tzinfo=tzinfo)
|
|
if dispatch >= now:
|
|
dispatches.append(dispatch)
|
|
if len(dispatches) > 0:
|
|
return dispatches[0]
|
|
return None
|
|
|
|
@property
|
|
def tags_as_list(self):
|
|
return sorted(list(set(filter(len, ','.join(self.tags.split(' ')).split(',')))))
|
|
|
|
|
|
class PendingCaptchaUser(Timestampable):
|
|
AUTO_COLLECT = True
|
|
bot_token = models.CharField(max_length=255, blank=False, null=False)
|
|
captcha_answer = models.CharField(max_length=255, blank=False, null=False)
|
|
group = models.ForeignKey(
|
|
to=TelegramGroup, on_delete=models.CASCADE, related_name='users_with_pending_captcha')
|
|
user = models.ForeignKey(
|
|
to=TelegramUser, on_delete=models.CASCADE, related_name='pending_captchas')
|
|
lifetime = models.IntegerField(blank=True, null=False)
|
|
attempts_left = models.IntegerField(blank=True, null=False)
|
|
captcha_message_id = models.IntegerField(blank=True, null=False)
|
|
|
|
class Meta:
|
|
ordering = ['-lifetime', 'attempts_left',
|
|
'user', 'group', 'captcha_answer']
|