Maak LangChain Agents voor Geautomatiseerde Workflow Orkestratie en Taakplanning
description
Maakt het mogelijk om LangChain agents te bouwen die complexe workflows met meerdere stappen kunnen beheren met nauwkeurige controle over de uitvoering van taken en timing, waardoor automatisering betrouwbaarder en efficiënter wordt dan met eenvoudige chains. Deze prompt behandelt geavanceerde orkestratie- en planningsbehoeften die niet in standaard LangChain tutorials voorkomen.
prompt
Help mij een LangChain agent te maken die een workflow kan automatiseren bestaande uit de volgende taken: <beschrijf de volgorde van taken of processen>. Voeg methoden toe om taakafhankelijkheden, conditionele vertakkingen en planning of tijdscontrole binnen de agent te beh ...
try_prompt
generate_helper
disclaimerOnPageApi
tags
ai_answers
provider: openai
model: gpt-4.1-nano
Hier is een uitgebreide aanpak om een LangChain-agent te maken die jouw beschreven workflow automatiseert, inclusief taakafhankelijkheden, conditionele vertakkingen, planning, en fouttolerantie met retries en exponentiële backoff. Ik geef je ook codevoorbeelden voor de orkestratie van meerdere chains en het afhandelen van asynchrone of vertraagde taken.
---
### 1. Opzet en basisstructuur
We gebruiken Python met de `langchain` library en enkele ondersteunende modules zoals `asyncio`, `tenacity` voor retries, en `datetime` voor planning.
**Benodigdheden:**
```bash
pip install langchain tenacity
```
### 2. Definieer de taken als aparte chains
Elke taak wordt geïmplementeerd als een aparte chain/functie.
```python
from langchain import LLMChain
from langchain.chat_models import ChatOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
import asyncio
from datetime import datetime, timedelta
# Initialiseer het LLM-model
llm = ChatOpenAI(model="gpt-4")
# Lead data verzameling
def lead_data_verzameling():
# Placeholder voor daadwerkelijke data verzameling
return {"leads": [{"naam": "Jan", "email": "jan@example.com"}]}
lead_data_chain = LLMChain(llm=llm, prompt="Verzamel en structureer lead data.")
# E-mail segmentatie
def email_segmentatie(lead_data):
# Placeholder segmentatie
return [{"naam": lead["naam"], "email": lead["email"], "segment": "A"} for lead in lead_data["leads"]]
segmentatie_chain = LLMChain(llm=llm, prompt="Segmenteer de leads op basis van hun data.")
# Campagne lancering
def campagne_lancering(segment):
# Placeholder voor campagne lancering
print(f"Campagne gelanceerd voor {segment['naam']} in segment {segment['segment']}.")
# Prestatiebewaking
def prestatie_bewaking():
# Placeholder voor prestatie monitoring
print("Prestatiegegevens verzameld en geanalyseerd.")
```
---
### 3. Decorators voor retries met exponentiële backoff
Gebruik `tenacity` om retries te implementeren:
```python
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def veilige_functie(func, *args, **kwargs):
return func(*args, **kwargs)
```
Voor asynchrone taken:
```python
from tenacity.asyncio import AsyncRetrying
async def async_veilige_functie(func, *args, **kwargs):
async for attempt in AsyncRetrying(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)):
with attempt:
return await func(*args, **kwargs)
```
---
### 4. Workflow orkestratie met afhankelijkheden en conditionele vertakkingen
Hier is een voorbeeld van een hoofd-functie die de workflow beheert:
```python
async def automatiserings_workflow():
# Lead data verzamelen (met retries)
lead_data = await asyncio.get_event_loop().run_in_executor(
None, veilige_functie, lead_data_verzameling
)
# Segmentatie
segments = await asyncio.get_event_loop().run_in_executor(
None, veilige_functie, email_segmentatie, lead_data
)
# Voor elke lead in elk segment: campagne lanceren
campagnes = []
for segment in segments:
# Bijvoorbeeld: alleen lanceren voor segment 'A'
if segment['segment'] == 'A':
campagne_task = asyncio.create_task(
asyncio.get_event_loop().run_in_executor(
None, veilige_functie, campagne_lancering, segment
)
)
campagnes.append(campagne_task)
else:
# Conditionele vertakking: andere segmenten worden overgeslagen
print(f"Segment {segment['segment']} wordt overgeslagen.")
# Wacht tot alle campagnes gelanceerd zijn
await asyncio.gather(*campagnes)
# Prestatiebewaking (met retries)
await asyncio.get_event_loop().run_in_executor(
None, veilige_functie, prestatie_bewaking
)
# Planning: controle of taak gestart moet worden op bepaald tijdstip
geplande_start = datetime.now() + timedelta(seconds=10)
while datetime.now() < geplande_start:
await asyncio.sleep(1)
print("Workflow gestart op geplande tijd.")
```
### 5. Tijdscontrole en planning
In het bovenstaande voorbeeld wordt de workflow pas gestart na een geplande tijd (`geplande_start`). Je kunt dit uitbreiden met meer complexe planningslogica.
---
### 6. Samenvatting en extra tips
- **Taakafhankelijkheden:** worden beheerd door `asyncio.gather()` en volgorde van functies.
- **Conditionele vertakkingen:** worden met eenvoudige if-else statements geïmplementeerd.
- **Planning:** wordt met `datetime` en `sleep` gecontroleerd.
- **Fouttolerantie:** met `tenacity` retries en exponentiële backoff.
- **Asynchrone verwerking:** via `asyncio.create_task()` en `asyncio.gather()` voor gelijktijdige uitvoering.
---
### 7. Volledige voorbeeld code (samenvoeging)
Hier is een samengevoegde, vereenvoudigde versie:
```python
import asyncio
from datetime import datetime, timedelta
from tenacity import retry, stop_after_attempt, wait_exponential
from langchain.chat_models import ChatOpenAI
from langchain import LLMChain
llm = ChatOpenAI(model="gpt-4")
def lead_data_verzameling():
return {"leads": [{"naam": "Jan", "email": "jan@example.com"}]}
def email_segmentatie(lead_data):
return [{"naam": lead["naam"], "email": lead["email"], "segment": "A"} for lead in lead_data["leads"]]
def campagne_lancering(segment):
print(f"Campagne gelanceerd voor {segment['naam']} in segment {segment['segment']}.")
def prestatie_bewaking():
print("Prestatiegegevens verzameld en geanalyseerd.")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def veilige_functie(func, *args, **kwargs):
return func(*args, **kwargs)
async def main():
geplande_start = datetime.now() + timedelta(seconds=10)
while datetime.now() < geplande_start:
await asyncio.sleep(1)
print("Workflow gestart op geplande tijd.")
lead_data = await asyncio.get_event_loop().run_in_executor(None, veilige_functie, lead_data_verzameling)
segments = await asyncio.get_event_loop().run_in_executor(None, veilige_functie, email_segmentatie, lead_data)
campagnes = []
for segment in segments:
if segment['segment'] == 'A':
campagne_task = asyncio.create_task(
asyncio.get_event_loop().run_in_executor(None, veilige_functie, campagne_lancering, segment)
)
campagnes.append(campagne_task)
await asyncio.gather(*campagnes)
await asyncio.get_event_loop().run_in_executor(None, veilige_functie, prestatie_bewaking)
print("Workflow voltooid.")
if __name__ == "__main__":
asyncio.run(main())
```
---
### Conclusie
Met deze structuur kun je een uitgebreide en robuuste LangChain-gebaseerde workflow bouwen die afhankelijkheden, conditionele vertakkingen, planning, en fouttolerantie integreert. Je kunt dit verder uitbreiden met meer complexe planningsmechanismen, logging, en integraties naar jouw specifieke systemen.
provider: deepseek
model: deepseek-chat
Zeker, ik help je graag een robuuste LangChain-agent te bouwen. Hier is een complete implementatie in het Nederlands:
```python
import asyncio
import time
from typing import Dict, List, Any, Optional
from langchain.agents import AgentType, initialize_agent, Tool
from langchain.schema import SystemMessage
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.agents import tool
from tenacity import retry, stop_after_attempt, wait_exponential
class WorkflowAutomationAgent:
def __init__(self):
self.llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo")
self.setup_tools()
self.setup_agent()
def setup_tools(self):
# Tools voor elke taak met retry-logica
self.tools = [
Tool(
name="lead_data_verzameling",
func=self._with_retry(self.verzamel_lead_data),
description="Verzamel lead data uit verschillende bronnen"
),
Tool(
name="email_segmentatie",
func=self._with_retry(self.segmenteer_emails),
description="Segmenteer e-mails op basis van lead data"
),
Tool(
name="campagne_lancering",
func=self._with_retry(self.lanceer_campagne),
description="Lanceer e-mailcampagne voor gesegmenteerde groepen"
),
Tool(
name="prestatie_bewaking",
func=self._with_retry(self.monitor_prestaties),
description="Monitor campagneprestaties en rapporteer metrics"
)
]
def setup_agent(self):
system_message = SystemMessage(content="""
Je bent een Workflow Automation Agent. Beheer de volgende taken:
1. Lead data verzameling
2. E-mail segmentatie (afhankelijk van 1)
3. Campagne lancering (afhankelijk van 2)
4. Prestatiebewaking (asynchroon na 3)
Houd rekening met taakafhankelijkheden en voer conditionele checks uit.
Gebruik exponentiële backoff voor retry-pogingen.
""")
self.agent = initialize_agent(
tools=self.tools,
llm=self.llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
agent_kwargs={"system_message": system_message}
)
# Retry decorator met exponentiële backoff
def _with_retry(self, func):
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Poging mislukt: {e}")
raise
return wrapper
# Task implementaties
def verzamel_lead_data(self, query: str) -> str:
"""Verzamel lead data met afhankelijkheidscheck"""
print("Lead data verzamelen...")
# Simulatie dataverzameling
time.sleep(1)
return "Lead data succesvol verzameld: 1000 leads"
def segmenteer_emails(self, query: str) -> str:
"""Segmenteer e-mails met conditionele check op lead data"""
if "lead" not in query.lower():
raise ValueError("Lead data vereist voor segmentatie")
print("E-mails segmenteren...")
time.sleep(1)
return "E-mails gesegmenteerd in 3 groepen: A, B, C"
def lanceer_campagne(self, query: str) -> str:
"""Lanceer campagne met dependency check"""
if "segment" not in query.lower():
raise ValueError("Segmentatie data vereist voor campagne")
print("Campagne lancering...")
time.sleep(2)
return "Campagne succesvol gelanceerd voor alle segmenten"
def monitor_prestaties(self, query: str) -> str:
"""Asynchrone prestatiebewaking"""
print("Prestaties monitoren...")
time.sleep(3)
return "Prestaties: Open rate 25%, CTR 5%, Conversie 2%"
# Workflow orchestration
async def execute_workflow(self):
"""Orchestreer de complete workflow met afhankelijkheden"""
try:
# Sequentiële uitvoering met dependencies
result1 = await asyncio.to_thread(
self.agent.run,
"Voer lead data verzameling uit"
)
# Conditionele vertakking op basis van resultaat
if "succesvol" in result1.lower():
result2 = await asyncio.to_thread(
self.agent.run,
f"Voer e-mail segmentatie uit op basis van: {result1}"
)
result3 = await asyncio.to_thread(
self.agent.run,
f"Lanceer campagne gebruikmakend van: {result2}"
)
# Asynchrone prestatiebewaking met vertraging
await asyncio.sleep(300) # Wacht 5 minuten
result4 = await asyncio.to_thread(
self.agent.run,
"Monitor campagne prestaties"
)
return {
"lead_verzameling": result1,
"email_segmentatie": result2,
"campagne_lancering": result3,
"prestatie_bewaking": result4
}
else:
raise Exception("Lead verzameling mislukt, workflow gestopt")
except Exception as e:
print(f"Workflow mislukt: {e}")
return self.handle_failure(e)
def handle_failure(self, error: Exception) -> Dict:
"""Foutafhandeling met state recovery"""
return {
"status": "mislukt",
"error": str(error),
"retry_count": 0,
"next_retry": time.time() + 60 # Retry na 60 seconden
}
# Gebruiksvoorbeeld
async def main():
agent = WorkflowAutomationAgent()
# Start de workflow
result = await agent.execute_workflow()
print("Workflow Resultaat:", result)
# Voorbeeld van conditionele vertakking
if result["status"] == "succesvol":
print("Workflow voltooid - analyseer resultaten")
else:
print("Workflow mislukt - initieer retry")
# Uitvoeren
if __name__ == "__main__":
asyncio.run(main())
```
## Belangrijke Kenmerken:
1. **Taakafhankelijkheden**: Elke taak checkt of vereiste data aanwezig is
2. **Conditionele Vertakkingen**: `if "succesvol" in result1` bepaalt workflow voortgang
3. **Asynchrone Uitvoering**: `asyncio` voor niet-blokkerende operaties
4. **Vertraagde Uitvoering**: `await asyncio.sleep(300)` voor prestatiebewaking
5. **Fouttolerantie**: Retry-logica met exponentiële backoff (max 3 pogingen)
6. **State Management**: Foutafhandeling met recovery informatie
## Vereisten:
```bash
pip install langchain openai tenacity asyncio
```
## Aanvullende Optimalisaties:
```python
# Voor parallelle uitvoering van onafhankelijke taken
async def parallel_execution(self):
task1 = asyncio.create_task(self.agent.run("Taak 1"))
task2 = asyncio.create_task(self.agent.run("Taak 2"))
results = await asyncio.gather(task1, task2, return_exceptions=True)
# Error handling voor parallelle taken
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Taak {i+1} mislukt: {result}")
```
Deze implementatie biedt een robuuste basis voor workflow automatisering met alle gevraagde functionaliteiten.
error: Generation failed [deepseek] prompt_id=2245 locale=en err=DeepSeek connection/timeout | Generation failed [deepseek] prompt_id=2245 locale=pt err=DeepSeek connection/timeout