Implementeer Asynchrone Taken in Flask met Celery
description
Met deze prompt leer je hoe je achtergrondtaken efficiënt en betrouwbaar kunt verwerken in je Flask-applicatie. Het helpt problemen met traagheid door synchronisatie op te lossen en verhoogt de schaalbaarheid van je backend. Deze aanpak is praktischer en toekomstbestendiger dan eenvoudige synchronisatie of handmatige threading.
prompt
Help me met het integreren van asynchrone taakverwerking in mijn Flask-applicatie met Celery. Beschrijf hoe ik de taakdefinities moet opzetten, de broker en backend moet configureren (bijvoorbeeld Redis of R ...
try_prompt
generate_helper
disclaimerOnPageApi
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Natuurlijk! Hier is een stapsgewijze gids om asynchrone taakverwerking te integreren in je Flask-applicatie met Celery, gebruikmakend van Redis als broker en result backend:
1. Installatie van benodigde pakketten:
Zorg dat je Celery en Redis-py hebt geïnstalleerd:
```bash
pip install celery redis
```
2. Celery configuratie in je Flask-app:
Maak een aparte Celery-configuratie in bijvoorbeeld een bestand `celery_worker.py` of direct in je Flask-app.
Voorbeeld `celery_worker.py`:
```python
from celery import Celery
def make_celery(app):
# Configureer Celery met dezelfde settings als je Flask app
celery = Celery(
app.import_name,
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# Optioneel: gebruik app.config om configuraties over te nemen
# celery.conf.update(app.config)
return celery
```
3. Definieer je taken:
Maak een module, bijvoorbeeld `tasks.py`, waarin je je taken definieert:
```python
from celery_worker import make_celery
celery = make_celery(app)
@celery.task
def add(x, y):
return x + y
```
4. Integratie met je Flask app:
In je Flask-app kun je de Celery instantie importeren en gebruiken om taken aan te roepen:
```python
from flask import Flask
from tasks import add
app = Flask(__name__)
@app.route('/start-task')
def start_task():
result = add.delay(4, 6) # Voer de taak op de achtergrond uit
return f"Taak gestart, result id: {result.id}"
```
5. Taak beheren en resultaten ophalen:
Je kunt de status en resultaten van taken opvragen via:
```python
result = add.AsyncResult(task_id)
if result.state == 'PENDING':
# taak wacht op uitvoering
elif result.state == 'SUCCESS':
resultaat_waarde = result.get()
```
6. Celery worker starten:
Start de Celery worker vanuit de command line:
```bash
celery -A celery_worker import make_celery worker --beat --loglevel=info
```
Samenvatting:
- Gebruik Redis (`redis://localhost:6379/0`) als broker en backend.
- Definieer taken met de `@celery.task` decorator.
- Roep taken asynchroon aan met `.delay()`.
- Beheer taken door resultaat objecten te controleren.
Met deze aanpak kun je je Flask-app uitbreiden met robuuste achtergrondtaken, verbeterde schaalbaarheid en betere gebruikerservaring.
provider: deepseek
model: deepseek-chat
Zeker, ik help je met het integreren van Celery in je Flask-applicatie met Redis als broker. Hier is een stapsgewijze handleiding:
## 1. Installatie vereiste packages
```bash
pip install celery redis flask
```
## 2. Basis Celery configuratie
Maak een bestand `celery_worker.py`:
```python
from celery import Celery
from flask import Flask
def make_celery(app):
# Configureer Celery met Redis als broker en backend
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# Flask app configuratie
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)
# Initialiseer Celery
celery = make_celery(app)
```
## 3. Taakdefinities
Maak een bestand `tasks.py`:
```python
from celery_worker import celery
import time
from datetime import datetime
@celery.task(bind=True)
def lange_berekening(self, x, y):
"""Voorbeeld van een lange taak"""
try:
self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})
time.sleep(5) # Simuleer zware berekening
self.update_state(state='PROGRESS', meta={'current': 50, 'total': 100})
time.sleep(5)
resultaat = x * y
return {'resultaat': resultaat, 'voltooid_op': str(datetime.now())}
except Exception as e:
self.update_state(state='FAILURE', meta={'exception': str(e)})
raise
@celery.task
def email_verzenden(ontvanger, onderwerp, bericht):
"""Taak voor het verzenden van emails"""
# Hier zou je je email logica implementeren
time.sleep(2) # Simuleer email verzending
return f"Email verzonden naar {ontvanger}"
@celery.task
def data_verwerken(data):
"""Taak voor dataverwerking"""
# Verwerkingslogica hier
verwerkte_data = [item * 2 for item in data]
return {'origineel': data, 'verwerkt': verwerkte_data}
```
## 4. Flask routes voor taakbeheer
Voeg toe aan je Flask app:
```python
from flask import jsonify, request
from tasks import lange_berekening, email_verzenden, data_verwerken
from celery_worker import celery
@app.route('/start-taak/<int:x>/<int:y>')
def start_taak(x, y):
"""Start een lange berekeningstaak"""
taak = lange_berekening.delay(x, y)
return jsonify({'taak_id': taak.id}), 202
@app.route('/taak-status/<taak_id>')
def taak_status(taak_id):
"""Controleer de status van een taak"""
taak = lange_berekening.AsyncResult(taak_id)
if taak.state == 'PENDING':
response = {'status': 'Bezig met verwerken...'}
elif taak.state == 'PROGRESS':
response = {
'status': 'Bezig',
'voortgang': taak.info.get('current', 0),
'totaal': taak.info.get('total', 100)
}
elif taak.state == 'SUCCESS':
response = {
'status': 'Voltooid',
'resultaat': taak.result
}
else:
response = {
'status': 'Mislukt',
'fout': str(taak.info)
}
return jsonify(response)
@app.route('/verstuur-email', methods=['POST'])
def verstuur_email():
"""Start een email verzendtaak"""
data = request.json
taak = email_verzenden.delay(
data['ontvanger'],
data['onderwerp'],
data['bericht']
)
return jsonify({'taak_id': taak.id}), 202
@app.route('/verwerk-data', methods=['POST'])
def verwerk_data():
"""Start een dataverwerkingstaak"""
data = request.json.get('data', [])
taak = data_verwerken.delay(data)
return jsonify({'taak_id': taak.id}), 202
```
## 5. Celery worker starten
Start de worker in een apart terminalvenster:
```bash
celery -A celery_worker.celery worker --loglevel=info
```
## 6. Optionele configuratie voor productie
Voeg toe aan je configuratie voor betere performance:
```python
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0',
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='Europe/Amsterdam',
CELERY_ENABLE_UTC=True
)
```
## 7. Gebruiksvoorbeelden
```python
# Taken aanroepen
taak = lange_berekening.delay(10, 20)
# Synchronous call (niet aanbevolen voor lange taken)
resultaat = lange_berekening.apply(args=[10, 20])
# Taak met timeout
taak = lange_berekening.apply_async(args=[10, 20], timeout=30)
# Scheduled taak (uitvoeren over 1 minuut)
from datetime import datetime, timedelta
eta = datetime.now() + timedelta(minutes=1)
taak = lange_berekening.apply_async(args=[10, 20], eta=eta)
```
## 8. Monitoring (optioneel)
Installeer Flower voor monitoring:
```bash
pip install flower
celery -A celery_worker.celery flower
```
Bezoek dan `http://localhost:5555` voor de monitoring interface.
Zorg ervoor dat Redis draait op `localhost:6379` voordat je de worker start. Je kunt Redis starten met:
```bash
redis-server
```
Deze setup biedt een solide basis voor asynchrone taakverwerking in je Flask-applicatie met Redis als message broker.