Geavanceerde API Rate Limiting en Beveiliging met Flask
description
Met deze prompt kan de gebruiker geavanceerde methoden voor API-beveiliging en rate limiting toepassen die verder gaan dan standaardlimieten. Dit helpt bij het voorkomen van misbruik, het verbeteren van de API-prestaties en het verhogen van de veiligheid, specifiek afgestemd op hun huidige configuratie. Het biedt een diepgaander en flexibeler alternatief dan basis rate limiting technieken.
prompt
Help me geavanceerde API rate limiting en beveiligingsmaatregelen te implementeren in mijn Flask applicatie. Mijn huidige API configuratie is: <beschrijf je huidige API configuratie>. Leg uit hoe ik dynamische rate ...
try_prompt
generate_helper
disclaimerOnPageApi
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Natuurlijk! Hier is een uitgebreide gids voor het implementeren van geavanceerde API rate limiting en beveiligingsmaatregelen in je Flask applicatie:
1. Gebruik van Flask-Limiter voor dynamische rate limiting
Flask-Limiter is een krachtige extensie die rate limiting ondersteunt en kan worden aangepast op basis van gebruikersrollen, IP-adressen en andere criteria.
2. Installatie
```bash
pip install flask-limiter
```
3. Basis setup in je Flask app
```python
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(
app,
key_func=get_remote_address, # standaard op IP-adres
default_limits=["200 per day", "50 per hour"]
)
```
4. Dynamische rate limiting op basis van gebruikersrollen of API-sleutels
Je kunt een aangepaste `key_func` schrijven die o.a. de JWT token controleert en op basis daarvan de limieten bepaalt.
```python
import jwt
def dynamic_rate_limit():
auth_header = request.headers.get('Authorization', None)
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split()[1]
try:
payload = jwt.decode(token, 'je_secret_key', algorithms=['HS256'])
rol = payload.get('role', 'user')
if rol == 'admin':
return "1000 per day" # hogere limiet voor admins
elif rol == 'user':
return "200 per day"
else:
return "50 per day"
except jwt.InvalidTokenError:
pass
# fallback op IP-adres
return get_remote_address()
limiter = Limiter(
app,
key_func=dynamic_rate_limit,
default_limits=[]
)
```
5. IP-whitelisting en zwarte lijsten
Voor IP-whitelisting en blacklisting kun je middleware of decorator toevoegen. Bijvoorbeeld:
```python
whitelist = {"123.456.78.9", "111.222.33.44"}
blacklist = {"222.222.222.222"}
@app.before_request
def restrict_ip():
ip = get_remote_address()
if ip in blacklist:
return jsonify({"error": "Toegang geweigerd"}), 403
if whitelist and ip not in whitelist:
return jsonify({"error": "IP niet toegestaan"}), 403
```
6. Automatische blokkades bij verdacht gedrag
Je kunt gedrag monitoren en verdachte patronen detecteren, bijvoorbeeld te veel failed authenticaties of te snelle opeenvolgende requests.
Voor automatische blokkades:
- Gebruik een in-memory store zoals Redis om verzoeken te tellen en verdachte patronen te detecteren.
- Wanneer verdachte activiteit wordt gedetecteerd, voeg het IP toe aan de zwarte lijst.
Voorbeeld met Redis:
```python
import redis
redis_db = redis.StrictRedis(host='localhost', port=6379, db=0)
def monitor_request():
ip = get_remote_address()
key = f"req_count:{ip}"
count = redis_db.incr(key)
redis_db.expire(key, 60) # tel in 60 seconden
if count > 100:
# Verdacht gedrag, IP blacklisten
blacklist.add(ip)
return jsonify({"error": "Verdacht gedrag gedetecteerd, toegang geblokkeerd."}), 403
@app.before_request
def security_monitor():
response = monitor_request()
if response:
return response
```
7. Extra beveiligingsmaatregelen
- **Gebruik HTTPS:** voor versleutelde communicatie
- **JWT beveiliging:** kortere token vervaltijden, refresh tokens
- **Input validatie:** voorkom injectie en andere aanvallen
- **Logging en monitoring:** houd verdachte activiteiten bij
---
Door bovenstaande methoden te combineren, creëer je een flexibele, beveiligde API die dynamisch reageert op gebruikersrollen, IP-adressen en gedragspatronen.
provider: deepseek
model: deepseek-chat
Zeker, ik help je met geavanceerde rate limiting en beveiligingsmaatregelen voor je Flask applicatie. Hier is een stapsgewijze implementatie:
## 1. Installatie benodigde packages
```bash
pip install flask flask-jwt-extended flask-limiter redis
```
## 2. Geavanceerde Rate Limiting met Flask-Limiter
```python
from flask import Flask, request
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_jwt_extended import JWTManager, get_jwt_identity, get_jwt
import redis
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'je-super-geheime-sleutel'
app.config['REDIS_URL'] = 'redis://localhost:6379/0'
jwt = JWTManager(app)
redis_client = redis.from_url(app.config['REDIS_URL'])
# Dynamische rate limiting functie
def get_role_based_limit():
try:
claims = get_jwt()
user_role = claims.get('role', 'guest')
limits = {
'admin': '1000 per minute',
'premium': '500 per minute',
'user': '100 per minute',
'guest': '10 per minute'
}
return limits.get(user_role, '10 per minute')
except:
return '10 per minute'
# Rate limiter initialiseren
limiter = Limiter(
app=app,
key_func=get_remote_address,
storage_uri=app.config['REDIS_URL']
)
# Dynamische rate limiting op endpoints
@app.route('/api/users', methods=['GET'])
@limiter.limit(get_role_based_limit)
def get_users():
# Je bestaande logica
pass
```
## 3. Geïntegreerde Beveiligingsmaatregelen
```python
from datetime import datetime, timedelta
import re
class SecurityManager:
def __init__(self, redis_client):
self.redis = redis_client
self.whitelist_key = 'ip_whitelist'
self.blacklist_key = 'ip_blacklist'
self.suspicious_key = 'suspicious_activity'
def is_ip_whitelisted(self, ip):
return self.redis.sismember(self.whitelist_key, ip)
def is_ip_blacklisted(self, ip):
return self.redis.sismember(self.blacklist_key, ip)
def add_to_blacklist(self, ip, duration_hours=24):
self.redis.sadd(self.blacklist_key, ip)
self.redis.expire(self.blacklist_key, duration_hours * 3600)
def track_suspicious_activity(self, ip, reason):
key = f"{self.suspicious_key}:{ip}"
current_count = self.redis.incr(key)
self.redis.expire(key, 3600) # 1 uur retentie
if current_count >= 5: # Na 5 verdachte activiteiten
self.add_to_blacklist(ip)
self.redis.lpush('security_log',
f"{datetime.now()}: {ip} - {reason} - Count: {current_count}")
# Initialiseren
security_manager = SecurityManager(redis_client)
# Voorbeeld van automatische blokkade
def check_and_block_suspicious():
ip = get_remote_address()
if security_manager.is_ip_blacklisted(ip):
return {"error": "IP geblokkeerd"}, 403
# Controleer op verdachte patronen
if detect_suspicious_pattern(request):
security_manager.track_suspicious_activity(ip, "Verdacht request patroon")
return {"error": "Verdachte activiteit gedetecteerd"}, 429
def detect_suspicious_pattern(request):
# Implementeer je eigen detectielogica
user_agent = request.headers.get('User-Agent', '')
path = request.path
# Voorbeeld: Detecteer scraping patterns
if 'python-requests' in user_agent.lower() and '/api/users' in path:
return True
# Snelle opeenvolgende requests
request_key = f"request_count:{ip}"
count = redis_client.incr(request_key)
redis_client.expire(request_key, 1) # 1 seconde window
if count > 10: # Meer dan 10 requests per seconde
return True
return False
```
## 4. API Key Authenticatie met Rate Limiting
```python
from functools import wraps
def api_key_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or not validate_api_key(api_key):
return {"error": "Ongeldige API sleutel"}, 401
return f(*args, **kwargs)
return decorated_function
def validate_api_key(api_key):
key_data = redis_client.get(f"api_key:{api_key}")
if not key_data:
return False
# Verhoog gebruikerscount en check limits
usage_key = f"api_usage:{api_key}"
current_usage = redis_client.incr(usage_key)
redis_client.expire(usage_key, 3600) # Reset elk uur
max_usage = 1000 # Stel limiet in op basis van key type
return current_usage <= max_usage
@app.route('/api/protected', methods=['GET'])
@api_key_required
@limiter.limit("100 per hour") aparte limiet voor API keys
def protected_endpoint():
return {"message": "API key authenticatie succesvol"}
```
## 5. Middleware voor Real-time Beveiliging
```python
@app.before_request
def security_check():
ip = get_remote_address()
# Whitelist check (hoogste prioriteit)
if security_manager.is_ip_whitelisted(ip):
return None
# Blacklist check
if security_manager.is_ip_blacklisted(ip):
return {"error": "Toegang geweigerd"}, 403
# Verdachte activiteiten check
suspicious_response = check_and_block_suspicious()
if suspicious_response:
return suspicious_response
# Whitelist/Blacklist management endpoints
@app.route('/admin/security/whitelist/<ip>', methods=['POST'])
@jwt_required()
def add_to_whitelist(ip):
if get_jwt().get('role') != 'admin':
return {"error": "Geen toegang"}, 403
security_manager.redis.sadd('ip_whitelist', ip)
return {"message": f"IP {ip} toegevoegd aan whitelist"}
@app.route('/admin/security/blacklist/<ip>', methods=['POST'])
@jwt_required()
def add_to_blacklist(ip):
if get_jwt().get('role') != 'admin':
return {"error": "Geen toegang"}, 403
security_manager.add_to_blacklist(ip)
return {"message": f"IP {ip} toegevoegd aan blacklist"}
```
## 6. Geavanceerde Monitoring en Logging
```python
import logging
from logging.handlers import RotatingFileHandler
# Configureer logging
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
handler = RotatingFileHandler('security.log', maxBytes=10000, backupCount=5)
security_logger.addHandler(handler)
@app.after_request
def log_security_events(response):
if response.status_code >= 400:
security_logger.warning(
f"Security event: {request.remote_addr} - "
f"{request.method} {request.path} - "
f"Status: {response.status_code}"
)
return response
```
## 7. Configuratie Tips
1. **Redis Configuratie**: Gebruik Redis voor persistente storage van rate limits en blacklists
2. **Limieten Afstemmen**: Pas limieten aan op basis van je gebruikersprofielen
3. **Real-time Monitoring**: Implementeer dashboard voor security events
4. **Automatische Reviews**: Plan regelmatige security audits
5. **Backup Systemen**: Zorg voor fallback mechanismen bij storage failures
Deze implementatie biedt een robuuste basis voor geavanceerde rate limiting en beveiliging in je Flask applicatie. Pas de limieten en detectieregels aan op basis van je specifieke use case en monitoring behoeften.