After the time I came up to the working solution. I decided to use Celery task to calculate next occurrence date, when training passed.
Future improvements can cover celery-beat schedule, to make sure all trainings was updated at the end of the day.
I can't pretend that it is the best solution, but if anyone has more experience working with this type of work, I would appreciate any changes or improvements.
models.py
class Training(TimestampModel):
template = models.ForeignKey(TrainingTemplate, on_delete=models.CASCADE, related_name='trainings')
start_time = models.TimeField()
end_time = models.TimeField()
location = models.ForeignKey(TrainingLocation, on_delete=models.PROTECT, related_name='trainings')
recurrences = RecurrenceField()
next_occurrence = models.DateTimeField(null=True, editable=False)
next_occurrence_celery_task_id = models.UUIDField(null=True, editable=False)
members = models.ManyToManyField(Member, through=MemberTraining, related_name='trainings', blank=True)
objects = TrainingManager()
class Meta:
ordering = ['next_occurrence', 'start_time']
indexes = [
models.Index(fields=['next_occurrence', 'start_time']),
]
def save(self, *args, **kwargs):
self.next_occurrence = self.calculate_next_occurrence()
super().save(*args, **kwargs)
def calculate_next_occurrence(self) -> datetime.datetime | None:
try:
now = timezone.now()
today = timezone.make_naive(now).replace(hour=0, minute=0, second=0, microsecond=0)
next_date = self.recurrences.after(today, inc=True)
combined = datetime.datetime.combine(next_date.date(), self.start_time)
aware_dt = timezone.make_aware(combined)
if aware_dt > now:
return aware_dt
next_date = self.recurrences.after(today + timedelta(days=1), inc=True)
return timezone.make_aware(datetime.datetime.combine(next_date.date(), self.start_time))
except AttributeError:
return None
signals.py
@receiver(post_save, sender=Training)
def calculate_next_occurrence(sender, instance, *args, **kwargs):
if instance.next_occurrence:
if instance.next_occurrence_celery_task_id:
AsyncResult(str(instance.next_occurrence_celery_task_id)).revoke()
result = calculate_next_training_occurrence.apply_async(
args=(instance.id, ),
eta=instance.next_occurrence,
retry=False,
)
Training.objects.filter(id=instance.id).update(next_occurrence_celery_task_id=result.id)
tasks.py
@shared_task
def calculate_next_training_occurrence(training_id: int):
try:
training = Training.objects.get(id=training_id)
training.next_occurrence = training.calculate_next_occurrence()
training.save(update_fields=['next_occurrence'])
except ObjectDoesNotExist:
pass