Help me een Python-script ontwikkelen dat integreert met een berichtbroker om communicatie tussen microservices te faciliteren.
description
Door deze prompt te volgen, leer je hoe je efficiënte communicatie tussen microservices kunt implementeren met behulp van een berichtbroker, wat de schaalbaarheid en robuustheid van je toepassingen kan verbeteren.
prompt
try_prompt
Ik wil een Python-script maken dat verbinding maakt met een berichtbroker zoals RabbitMQ of Kafka om berichten te verzenden en ontvangen tussen verschillende microservices. Geef de stappen voor het instellen van de verbinding, he ... more
generate_helper
...
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Hier zijn de stappen voor het maken van een Python-script dat verbinding maakt met RabbitMQ, berichten verzendt en ontvangt, en omgaat met bevestigingen en retries voor de microservice "Gebruiker Service":
### 1. Installeer benodigde Python-bibliotheken
Gebruik bijvoorbeeld `pika` voor RabbitMQ:
```bash
pip install pika
```
### 2. Verbinding maken met RabbitMQ
Maak een verbinding en kanaal:
```python
import pika
# Verbinding maken
connection_params = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
```
### 3. Berichtwachtrijen beheren
Declareer een wachtrij voor de "Gebruiker Service"
```python
queue_name = 'gebruiker_service_queue'
channel.queue_declare(queue=queue_name, durable=True)
```
### 4. Bericht verzenden
Gebruik `basic_publish` met bevestiging:
```python
def verzend_bericht(bericht):
# Zet het bericht om naar bytes
bericht_bytes = bericht.encode('utf-8')
# Verstuur met persistentie om verlies te voorkomen
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=bericht_bytes,
properties=pika.BasicProperties(delivery_mode=2) # persistentie
)
print(f"Verzonden: {bericht}")
```
### 5. Bericht ontvangen en verwerken
Gebruik een consumer met bevestiging:
```python
def callback(ch, method, properties, body):
bericht = body.decode('utf-8')
print(f"Ontvangen bericht: {bericht}")
# Hier verwerk je het bericht
# Na succesvolle verwerking bevestig je
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # voorkom overload
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print('Wacht op berichten...')
channel.start_consuming()
```
### 6. Afhandelen van bevestigingen en retries
Voor betrouwbare levering:
- Gebruik publisher confirms:
```python
channel.confirm_delivery()
try:
verzend_bericht("Testbericht")
except pika.exceptions.UnroutableError:
print("Bericht kon niet worden afgeleverd, probeer opnieuw.")
```
- Voor retries bij mislukking:
```python
import time
def verzend_met_retries(bericht, max_retries=3):
for attempt in range(max_retries):
try:
verzend_bericht(bericht)
print("Bericht succesvol verzonden.")
break
except pika.exceptions.AMQPError as e:
print(f"Fout bij verzenden: {e}, poging {attempt + 1}")
time.sleep(2 ** attempt) # Exponentiële backoff
else:
print("Maximaal aantal retries bereikt, bericht niet verzonden.")
```
### Samenvatting
- Maak verbinding en declareer wachtrijen
- Verstuur berichten met persistentie en bevestiging
- Ontvang berichten en bevestig na verwerking
- Implementeer retries voor betrouwbaarheid
Wil je een volledig voorbeeld met al deze onderdelen samengevoegd?
provider: deepseek
model: deepseek-chat
Hier is een stapsgewijze handleiding voor het maken van een Python-script dat verbinding maakt met RabbitMQ voor de Gebruiker Service:
## 1. Vereisten installeren
```bash
pip install pika
```
## 2. Basis RabbitMQ verbinding opzetten
```python
import pika
import json
import logging
from typing import Dict, Any
# Configuratie
RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'guest'
RABBITMQ_PASSWORD = 'guest'
# Wachtrij namen
USER_CREATED_QUEUE = 'user_created'
USER_UPDATED_QUEUE = 'user_updated'
class RabbitMQConnection:
def __init__(self):
self.connection = None
self.channel = None
def connect(self):
"""Maak verbinding met RabbitMQ"""
try:
credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=600
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logging.info("Verbonden met RabbitMQ")
except Exception as e:
logging.error(f"Fout bij verbinden met RabbitMQ: {e}")
raise
def close(self):
"""Sluit de verbinding"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logging.info("RabbitMQ verbinding gesloten")
```
## 3. Bericht Producer (Verzenden)
```python
class UserServiceProducer:
def __init__(self, rabbitmq_connection: RabbitMQConnection):
self.connection = rabbitmq_connection
self.setup_queues()
def setup_queues(self):
"""Stel wachtrijen in met durability"""
# Wachtrij voor aangemaakte gebruikers
self.connection.channel.queue_declare(
queue=USER_CREATED_QUEUE,
durable=True, # Berichten blijven behouden bij herstart
arguments={'x-queue-type': 'classic'}
)
# Wachtrij voor bijgewerkte gebruikers
self.connection.channel.queue_declare(
queue=USER_UPDATED_QUEUE,
durable=True,
arguments={'x-queue-type': 'classic'}
)
def send_user_created_message(self, user_data: Dict[str, Any]):
"""Verstuur bericht wanneer een gebruiker wordt aangemaakt"""
try:
message = {
'event_type': 'USER_CREATED',
'user_id': user_data['id'],
'email': user_data['email'],
'timestamp': user_data.get('timestamp')
}
self.connection.channel.basic_publish(
exchange='',
routing_key=USER_CREATED_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Maak bericht persistent
content_type='application/json'
)
)
logging.info(f"USER_CREATED bericht verzonden: {user_data['id']}")
except Exception as e:
logging.error(f"Fout bij verzenden USER_CREATED bericht: {e}")
raise
def send_user_updated_message(self, user_data: Dict[str, Any]):
"""Verstuur bericht wanneer een gebruiker wordt bijgewerkt"""
try:
message = {
'event_type': 'USER_UPDATED',
'user_id': user_data['id'],
'changes': user_data['changes'],
'timestamp': user_data.get('timestamp')
}
self.connection.channel.basic_publish(
exchange='',
routing_key=USER_UPDATED_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
logging.info(f"USER_UPDATED bericht verzonden: {user_data['id']}")
except Exception as e:
logging.error(f"Fout bij verzenden USER_UPDATED bericht: {e}")
raise
```
## 4. Bericht Consumer (Ontvangen) met Bevestiging en Retries
```python
class UserServiceConsumer:
def __init__(self, rabbitmq_connection: RabbitMQConnection):
self.connection = rabbitmq_connection
self.setup_consumption()
def setup_consumption(self):
"""Stel consumenten in met QoS voor fair dispatch"""
# Quality of Service - beperk het aantal unacknowledged berichten
self.connection.channel.basic_qos(prefetch_count=1)
def process_user_created(self, ch, method, properties, body):
"""Verwerk USER_CREATED berichten met retry mechanisme"""
max_retries = 3
retry_count = 0
while retry_count <= max_retries:
try:
message = json.loads(body)
logging.info(f"Verwerk USER_CREATED: {message}")
# Simuleer verwerking
self.handle_user_creation(message)
# Bevestig succesvolle verwerking
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Bericht succesvol verwerkt en bevestigd")
break
except Exception as e:
retry_count += 1
logging.error(f"Fout bij verwerken bericht (poging {retry_count}/{max_retries}): {e}")
if retry_count > max_retries:
logging.error("Max retries bereikt, bericht wordt afgewezen")
# Bericht afwijzen zonder terug in wachtrij te plaatsen
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
else:
# Wacht even voor retry
import time
time.sleep(2 ** retry_count) # Exponential backoff
def handle_user_creation(self, message: Dict[str, Any]):
"""Handel gebruiker creatie af - vervang met echte logica"""
# Simuleer mogelijke fout voor demonstratie
if 'error' in message.get('email', ''):
raise Exception("Simulated processing error")
# Echte verwerkingslogica hier
print(f"Gebruiker aangemaakt: {message['user_id']} - {message['email']}")
def start_consuming_user_created(self):
"""Start met consumeren van USER_CREATED berichten"""
self.connection.channel.basic_consume(
queue=USER_CREATED_QUEUE,
on_message_callback=self.process_user_created,
auto_ack=False # Handmatige bevestiging
)
logging.info("Start met consumeren van USER_CREATED berichten")
self.connection.channel.start_consuming()
```
## 5. Gebruiksvoorbeeld
```python
import signal
import sys
def main():
# Stel logging in
logging.basicConfig(level=logging.INFO)
# Maak verbinding
rabbitmq = RabbitMQConnection()
rabbitmq.connect()
# Initialiseer producer en consumer
producer = UserServiceProducer(rabbitmq)
consumer = UserServiceConsumer(rabbitmq)
def signal_handler(sig, frame):
"""Handel afsluiting netjes af"""
logging.info("Afsluiten...")
rabbitmq.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Voorbeeld: Verstuur een bericht
user_data = {
'id': '12345',
'email': 'gebruiker@voorbeeld.nl',
'timestamp': '2024-01-01T10:00:00Z'
}
producer.send_user_created_message(user_data)
# Start met consumeren (blokkeert)
try:
consumer.start_consuming_user_created()
except KeyboardInterrupt:
rabbitmq.close()
if __name__ == "__main__":
main()
```
## 6. Geavanceerde Retry Strategie met Dead Letter Queue
```python
class RetryMechanism:
def __init__(self, rabbitmq_connection: RabbitMQConnection):
self.connection = rabbitmq_connection
self.setup_retry_infrastructure()
def setup_retry_infrastructure(self):
"""Stel DLQ en retry exchange in"""
# Dead Letter Exchange
self.connection.channel.exchange_declare(
exchange='dlx',
exchange_type='direct',
durable=True
)
# Retry wachtrij
self.connection.channel.queue_declare(
queue='user_created_retry',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000, # 60 seconden
'x-dead-letter-routing-key': USER_CREATED_QUEUE
}
)
def send_to_retry_queue(self, message_body: str, retry_count: int):
"""Plaats bericht in retry wachtrij"""
if retry_count < 3:
self.connection.channel.basic_publish(
exchange='',
routing_key='user_created_retry',
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'retry_count': retry_count + 1}
)
)
```
## Belangrijke punten:
1. **Bevestiging**: Gebruik `auto_ack=False` en bevestig handmatig met `basic_ack`
2. **Durability**: Maak wachtrijen en berichten persistent
3. **QoS**: Beperk prefetch count voor fair message distribution
4. **Error Handling**: Implementeer retry mechanisme met exponential backoff
5. **Resource Management**: Sluit verbindingen netjes af
6. **Logging**: Log alle belangrijke gebeurtenissen voor debugging
Dit script biedt een robuuste basis voor berichtuitwisseling tussen microservices met RabbitMQ.

