Help me bij het implementeren van een taakmonitoringsysteem voor mijn Django-toepassing.
description
Door een taakmonitoringsysteem te implementeren, kun je de betrouwbaarheid van je toepassing verbeteren, snel problemen met achtergrondtaken identificeren en de algehele gebruikerservaring verbeteren.
prompt
try_prompt
Help me een taakmonitoringsysteem op te zetten in mijn Django-toepassing om de status en prestaties van achtergrondtaken te volgen. Beschrijf mijn huidige taakbehe ... more
generate_helper
...
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Om de duur van je taken te volgen in je Django-toepassing met Celery en Redis, kun je de volgende aanpak implementeren:
1. **Gebruik van Celery-signalen:**
Maak gebruik van de signalen `task_prerun` en `task_postrun` om de start- en eindtijden van elke taak vast te leggen. Hiermee kun je de duur van elke taak meten.
2. **Implementatie voorbeeld:**
```python
from celery.signals import task_prerun, task_postrun
import time
# Dictionary om starttijden op te slaan
task_start_times = {}
@task_prerun.connect
def task_started(sender=None, task_id=None, **kwargs):
task_start_times[task_id] = time.time()
@task_postrun.connect
def task_finished(sender=None, task_id=None, **kwargs):
start_time = task_start_times.pop(task_id, None)
if start_time:
duration = time.time() - start_time
# Hier kun je de duur loggen of opslaan
print(f'Taak {task_id} duurde {duration:.2f} seconden.')
# Bijvoorbeeld opslaan in een database of monitoring tool
```
3. **Metrics verzamelen en melden:**
- **Opslaan:** Sla de meetwaarden op in een database, bijvoorbeeld een model `TaskMetrics` met velden zoals `task_id`, `task_name`, `start_time`, `end_time`, `duration`.
- **Dashboards:** Gebruik tools zoals Grafana, Prometheus of een Django admin-dashboard om de gegevens te visualiseren.
4. **Monitoring en alerts:**
- Stel drempels in voor taakduur (bijvoorbeeld taken die langer duren dan 10 seconden).
- Configuratie van meldingen via e-mail, Slack of andere communicatiekanalen wanneer taken de verwachte duur overschrijden.
Door deze aanpak houd je nauwkeurig bij hoe lang je taken duren en kun je tijdige acties ondernemen bij afwijkingen.
provider: deepseek
model: deepseek-chat
Beste collega, ik help je graag een uitgebreid taakmonitoringsysteem op te zetten voor je Celery + Django configuratie. Hier is een complete implementatiegids:
## 1. Basis Celery Monitoring Configuratie
Voeg eerst toe aan je `celery.py`:
```python
from celery import Celery
from django.conf import settings
app = Celery('jouw_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# Monitoring configuratie
app.conf.update(
task_track_started=True,
task_send_sent_event=True,
worker_send_task_events=True,
result_extended=True,
)
```
## 2. Taakduur Monitoring Implementatie
### A. Custom Task Klasse met timing:
```python
# tasks.py
from celery import Task
import time
from django.utils import timezone
import logging
logger = logging.getLogger(__name__)
class MonitoredTask(Task):
def __call__(self, *args, **kwargs):
start_time = time.time()
task_id = self.request.id
logger.info(f"Taak {self.name} gestart (ID: {task_id})")
try:
result = super().__call__(*args, **kwargs)
duration = time.time() - start_time
# Log taakduur
logger.info(
f"Taak {self.name} voltooid in {duration:.2f} seconden "
f"(ID: {task_id})"
)
# Metrics opslaan (zie volgende sectie)
self._store_task_metrics(self.name, duration, 'success')
return result
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Taak {self.name} mislukt na {duration:.2f} seconden "
f"(ID: {task_id}): {str(e)}"
)
self._store_task_metrics(self.name, duration, 'failure')
raise
def _store_task_metrics(self, task_name, duration, status):
from .models import TaskMetrics # Je custom model
TaskMetrics.objects.create(
task_name=task_name,
duration=duration,
status=status,
completed_at=timezone.now()
)
```
### B. Gebruik in je taken:
```python
@app.task(base=MonitoredTask, bind=True)
def mijn_achtergrond_taak(self, data):
# Je taaklogica hier
time.sleep(5) # Simulatie werk
return {"result": "succes"}
```
## 3. Django Models voor Metrics Opslag
```python
# models.py
from django.db import models
from django.utils import timezone
class TaskMetrics(models.Model):
STATUS_CHOICES = [
('success', 'Succes'),
('failure', 'Mislukt'),
('retry', 'Opnieuw geprobeerd'),
]
task_name = models.CharField(max_length=255)
duration = models.FloatField(help_text="Duur in seconden")
status = models.CharField(max_length=20, choices=STATUS_CHOICES)
task_id = models.CharField(max_length=255, blank=True)
created_at = models.DateTimeField(default=timezone.now)
completed_at = models.DateTimeField()
error_message = models.TextField(blank=True)
class Meta:
indexes = [
models.Index(fields=['task_name']),
models.Index(fields=['created_at']),
models.Index(fields=['status']),
]
def __str__(self):
return f"{self.task_name} - {self.duration:.2f}s - {self.status}"
```
## 4. Flower voor Real-time Monitoring
Installeer en configureer Flower:
```bash
pip install flower
```
```bash
celery -A jouw_project flower \
--port=5555 \
--broker=redis://localhost:6379/0 \
--basic_auth=gebruiker:wachtwoord
```
## 5. Custom Django Admin voor Metrics
```python
# admin.py
from django.contrib import admin
from .models import TaskMetrics
@admin.register(TaskMetrics)
class TaskMetricsAdmin(admin.ModelAdmin):
list_display = ['task_name', 'duration', 'status', 'created_at']
list_filter = ['task_name', 'status', 'created_at']
readonly_fields = ['task_name', 'duration', 'status', 'created_at']
date_hierarchy = 'created_at'
def has_add_permission(self, request):
return False # Alleen via monitoring
```
## 6. Metrics en Statistieken
### A. Query's voor taakduur analyse:
```python
# utils/metrics.py
from django.db.models import Avg, Max, Min, Count
from .models import TaskMetrics
from django.utils import timezone
from datetime import timedelta
def get_task_duration_stats(task_name=None, hours=24):
"""Haal taakduur statistieken op"""
since = timezone.now() - timedelta(hours=hours)
queryset = TaskMetrics.objects.filter(created_at__gte=since)
if task_name:
queryset = queryset.filter(task_name=task_name)
stats = queryset.aggregate(
avg_duration=Avg('duration'),
max_duration=Max('duration'),
min_duration=Min('duration'),
total_tasks=Count('id'),
success_rate=Count('id', filter=models.Q(status='success')) * 100.0 / Count('id')
)
return stats
def get_slow_tasks(threshold_seconds=60):
"""Haal trage taken op"""
return TaskMetrics.objects.filter(
duration__gt=threshold_seconds
).order_by('-duration')
```
### B. API Endpoints voor metrics:
```python
# views.py
from django.http import JsonResponse
from .utils.metrics import get_task_duration_stats
def task_metrics_api(request):
task_name = request.GET.get('task_name')
hours = int(request.GET.get('hours', 24))
stats = get_task_duration_stats(task_name, hours)
return JsonResponse(stats)
```
## 7. Notificaties voor Trage Taken
```python
# signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.mail import send_mail
from .models import TaskMetrics
@receiver(post_save, sender=TaskMetrics)
def check_slow_tasks(sender, instance, created, **kwargs):
if created and instance.duration > 300: > 5 minuten
send_mail(
f"Trage taak gedetecteerd: {instance.task_name}",
f"Taak {instance.task_name} duurde {instance.duration:.2f} seconden",
'monitoring@jouwdomein.nl',
['admin@jouwdomein.nl'],
)
```
## 8. Celery Beat voor Periodieke Monitoring
```python
# periodic_tasks.py
from celery.schedules import crontab
app.conf.beat_schedule = {
'monitor-task-metrics-hourly': {
'task': 'jouw_app.tasks.cleanup_old_metrics',
'schedule': crontab(hour='*/1'), # Elk uur
},
}
@app.task
def cleanup_old_metrics():
"""Opruimen oude metrics"""
from django.utils import timezone
from datetime import timedelta
from .models import TaskMetrics
cutoff = timezone.now() - timedelta(days=30)
deleted_count = TaskMetrics.objects.filter(created_at__lt=cutoff).delete()
logger.info(f"Oude metrics opgeruimd: {deleted_count} records")
```
## 9. Gebruik in Productie
Voeg toe aan je `settings.py`:
```python
# Monitoring instellingen
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXTENDED = True
# Flower basic auth
FLOWER_BASIC_AUTH = ['gebruiker:wachtwoord']
# Task timeouts
CELERY_TASK_SOFT_TIME_LIMIT = 600 # 10 minuten
CELERY_TASK_TIME_LIMIT = 900 # 15 minuten
```
Dit systeem geeft je:
- **Real-time taakduur monitoring**
- **Historische trends en statistieken**
- **Automatische notificaties voor trage taken**
- **Web interface via Flower**
- **Django admin integratie**
Start met `celery -A jouw_project worker -l info` en `celery -A jouw_project flower` voor complete monitoring!

