slogan3

Maak LangChain Agents voor Geautomatiseerde Workflow Orkestratie en Taakplanning

description

Maakt het mogelijk om LangChain agents te bouwen die complexe workflows met meerdere stappen kunnen beheren met nauwkeurige controle over de uitvoering van taken en timing, waardoor automatisering betrouwbaarder en efficiënter wordt dan met eenvoudige chains. Deze prompt behandelt geavanceerde orkestratie- en planningsbehoeften die niet in standaard LangChain tutorials voorkomen.

prompt

Help mij een LangChain agent te maken die een workflow kan automatiseren bestaande uit de volgende taken: <beschrijf de volgorde van taken of processen>. Voeg methoden toe om taakafhankelijkheden, conditionele vertakkingen en planning of tijdscontrole binnen de agent te beh ...

try_prompt

generate_helper
disclaimerOnPageApi

ai_answers

provider: openai
model: gpt-4.1-nano
Hier is een uitgebreide aanpak om een LangChain-agent te maken die jouw beschreven workflow automatiseert, inclusief taakafhankelijkheden, conditionele vertakkingen, planning, en fouttolerantie met retries en exponentiële backoff. Ik geef je ook codevoorbeelden voor de orkestratie van meerdere chains en het afhandelen van asynchrone of vertraagde taken. --- ### 1. Opzet en basisstructuur We gebruiken Python met de `langchain` library en enkele ondersteunende modules zoals `asyncio`, `tenacity` voor retries, en `datetime` voor planning. **Benodigdheden:** ```bash pip install langchain tenacity ``` ### 2. Definieer de taken als aparte chains Elke taak wordt geïmplementeerd als een aparte chain/functie. ```python from langchain import LLMChain from langchain.chat_models import ChatOpenAI from tenacity import retry, stop_after_attempt, wait_exponential import asyncio from datetime import datetime, timedelta # Initialiseer het LLM-model llm = ChatOpenAI(model="gpt-4") # Lead data verzameling def lead_data_verzameling(): # Placeholder voor daadwerkelijke data verzameling return {"leads": [{"naam": "Jan", "email": "jan@example.com"}]} lead_data_chain = LLMChain(llm=llm, prompt="Verzamel en structureer lead data.") # E-mail segmentatie def email_segmentatie(lead_data): # Placeholder segmentatie return [{"naam": lead["naam"], "email": lead["email"], "segment": "A"} for lead in lead_data["leads"]] segmentatie_chain = LLMChain(llm=llm, prompt="Segmenteer de leads op basis van hun data.") # Campagne lancering def campagne_lancering(segment): # Placeholder voor campagne lancering print(f"Campagne gelanceerd voor {segment['naam']} in segment {segment['segment']}.") # Prestatiebewaking def prestatie_bewaking(): # Placeholder voor prestatie monitoring print("Prestatiegegevens verzameld en geanalyseerd.") ``` --- ### 3. Decorators voor retries met exponentiële backoff Gebruik `tenacity` om retries te implementeren: ```python @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def veilige_functie(func, *args, **kwargs): return func(*args, **kwargs) ``` Voor asynchrone taken: ```python from tenacity.asyncio import AsyncRetrying async def async_veilige_functie(func, *args, **kwargs): async for attempt in AsyncRetrying(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)): with attempt: return await func(*args, **kwargs) ``` --- ### 4. Workflow orkestratie met afhankelijkheden en conditionele vertakkingen Hier is een voorbeeld van een hoofd-functie die de workflow beheert: ```python async def automatiserings_workflow(): # Lead data verzamelen (met retries) lead_data = await asyncio.get_event_loop().run_in_executor( None, veilige_functie, lead_data_verzameling ) # Segmentatie segments = await asyncio.get_event_loop().run_in_executor( None, veilige_functie, email_segmentatie, lead_data ) # Voor elke lead in elk segment: campagne lanceren campagnes = [] for segment in segments: # Bijvoorbeeld: alleen lanceren voor segment 'A' if segment['segment'] == 'A': campagne_task = asyncio.create_task( asyncio.get_event_loop().run_in_executor( None, veilige_functie, campagne_lancering, segment ) ) campagnes.append(campagne_task) else: # Conditionele vertakking: andere segmenten worden overgeslagen print(f"Segment {segment['segment']} wordt overgeslagen.") # Wacht tot alle campagnes gelanceerd zijn await asyncio.gather(*campagnes) # Prestatiebewaking (met retries) await asyncio.get_event_loop().run_in_executor( None, veilige_functie, prestatie_bewaking ) # Planning: controle of taak gestart moet worden op bepaald tijdstip geplande_start = datetime.now() + timedelta(seconds=10) while datetime.now() < geplande_start: await asyncio.sleep(1) print("Workflow gestart op geplande tijd.") ``` ### 5. Tijdscontrole en planning In het bovenstaande voorbeeld wordt de workflow pas gestart na een geplande tijd (`geplande_start`). Je kunt dit uitbreiden met meer complexe planningslogica. --- ### 6. Samenvatting en extra tips - **Taakafhankelijkheden:** worden beheerd door `asyncio.gather()` en volgorde van functies. - **Conditionele vertakkingen:** worden met eenvoudige if-else statements geïmplementeerd. - **Planning:** wordt met `datetime` en `sleep` gecontroleerd. - **Fouttolerantie:** met `tenacity` retries en exponentiële backoff. - **Asynchrone verwerking:** via `asyncio.create_task()` en `asyncio.gather()` voor gelijktijdige uitvoering. --- ### 7. Volledige voorbeeld code (samenvoeging) Hier is een samengevoegde, vereenvoudigde versie: ```python import asyncio from datetime import datetime, timedelta from tenacity import retry, stop_after_attempt, wait_exponential from langchain.chat_models import ChatOpenAI from langchain import LLMChain llm = ChatOpenAI(model="gpt-4") def lead_data_verzameling(): return {"leads": [{"naam": "Jan", "email": "jan@example.com"}]} def email_segmentatie(lead_data): return [{"naam": lead["naam"], "email": lead["email"], "segment": "A"} for lead in lead_data["leads"]] def campagne_lancering(segment): print(f"Campagne gelanceerd voor {segment['naam']} in segment {segment['segment']}.") def prestatie_bewaking(): print("Prestatiegegevens verzameld en geanalyseerd.") @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def veilige_functie(func, *args, **kwargs): return func(*args, **kwargs) async def main(): geplande_start = datetime.now() + timedelta(seconds=10) while datetime.now() < geplande_start: await asyncio.sleep(1) print("Workflow gestart op geplande tijd.") lead_data = await asyncio.get_event_loop().run_in_executor(None, veilige_functie, lead_data_verzameling) segments = await asyncio.get_event_loop().run_in_executor(None, veilige_functie, email_segmentatie, lead_data) campagnes = [] for segment in segments: if segment['segment'] == 'A': campagne_task = asyncio.create_task( asyncio.get_event_loop().run_in_executor(None, veilige_functie, campagne_lancering, segment) ) campagnes.append(campagne_task) await asyncio.gather(*campagnes) await asyncio.get_event_loop().run_in_executor(None, veilige_functie, prestatie_bewaking) print("Workflow voltooid.") if __name__ == "__main__": asyncio.run(main()) ``` --- ### Conclusie Met deze structuur kun je een uitgebreide en robuuste LangChain-gebaseerde workflow bouwen die afhankelijkheden, conditionele vertakkingen, planning, en fouttolerantie integreert. Je kunt dit verder uitbreiden met meer complexe planningsmechanismen, logging, en integraties naar jouw specifieke systemen.
provider: deepseek
model: deepseek-chat
Zeker, ik help je graag een robuuste LangChain-agent te bouwen. Hier is een complete implementatie in het Nederlands: ```python import asyncio import time from typing import Dict, List, Any, Optional from langchain.agents import AgentType, initialize_agent, Tool from langchain.schema import SystemMessage from langchain.chains import LLMChain, SequentialChain from langchain.prompts import PromptTemplate from langchain.chat_models import ChatOpenAI from langchain.agents import tool from tenacity import retry, stop_after_attempt, wait_exponential class WorkflowAutomationAgent: def __init__(self): self.llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo") self.setup_tools() self.setup_agent() def setup_tools(self): # Tools voor elke taak met retry-logica self.tools = [ Tool( name="lead_data_verzameling", func=self._with_retry(self.verzamel_lead_data), description="Verzamel lead data uit verschillende bronnen" ), Tool( name="email_segmentatie", func=self._with_retry(self.segmenteer_emails), description="Segmenteer e-mails op basis van lead data" ), Tool( name="campagne_lancering", func=self._with_retry(self.lanceer_campagne), description="Lanceer e-mailcampagne voor gesegmenteerde groepen" ), Tool( name="prestatie_bewaking", func=self._with_retry(self.monitor_prestaties), description="Monitor campagneprestaties en rapporteer metrics" ) ] def setup_agent(self): system_message = SystemMessage(content=""" Je bent een Workflow Automation Agent. Beheer de volgende taken: 1. Lead data verzameling 2. E-mail segmentatie (afhankelijk van 1) 3. Campagne lancering (afhankelijk van 2) 4. Prestatiebewaking (asynchroon na 3) Houd rekening met taakafhankelijkheden en voer conditionele checks uit. Gebruik exponentiële backoff voor retry-pogingen. """) self.agent = initialize_agent( tools=self.tools, llm=self.llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True, agent_kwargs={"system_message": system_message} ) # Retry decorator met exponentiële backoff def _with_retry(self, func): @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: print(f"Poging mislukt: {e}") raise return wrapper # Task implementaties def verzamel_lead_data(self, query: str) -> str: """Verzamel lead data met afhankelijkheidscheck""" print("Lead data verzamelen...") # Simulatie dataverzameling time.sleep(1) return "Lead data succesvol verzameld: 1000 leads" def segmenteer_emails(self, query: str) -> str: """Segmenteer e-mails met conditionele check op lead data""" if "lead" not in query.lower(): raise ValueError("Lead data vereist voor segmentatie") print("E-mails segmenteren...") time.sleep(1) return "E-mails gesegmenteerd in 3 groepen: A, B, C" def lanceer_campagne(self, query: str) -> str: """Lanceer campagne met dependency check""" if "segment" not in query.lower(): raise ValueError("Segmentatie data vereist voor campagne") print("Campagne lancering...") time.sleep(2) return "Campagne succesvol gelanceerd voor alle segmenten" def monitor_prestaties(self, query: str) -> str: """Asynchrone prestatiebewaking""" print("Prestaties monitoren...") time.sleep(3) return "Prestaties: Open rate 25%, CTR 5%, Conversie 2%" # Workflow orchestration async def execute_workflow(self): """Orchestreer de complete workflow met afhankelijkheden""" try: # Sequentiële uitvoering met dependencies result1 = await asyncio.to_thread( self.agent.run, "Voer lead data verzameling uit" ) # Conditionele vertakking op basis van resultaat if "succesvol" in result1.lower(): result2 = await asyncio.to_thread( self.agent.run, f"Voer e-mail segmentatie uit op basis van: {result1}" ) result3 = await asyncio.to_thread( self.agent.run, f"Lanceer campagne gebruikmakend van: {result2}" ) # Asynchrone prestatiebewaking met vertraging await asyncio.sleep(300) # Wacht 5 minuten result4 = await asyncio.to_thread( self.agent.run, "Monitor campagne prestaties" ) return { "lead_verzameling": result1, "email_segmentatie": result2, "campagne_lancering": result3, "prestatie_bewaking": result4 } else: raise Exception("Lead verzameling mislukt, workflow gestopt") except Exception as e: print(f"Workflow mislukt: {e}") return self.handle_failure(e) def handle_failure(self, error: Exception) -> Dict: """Foutafhandeling met state recovery""" return { "status": "mislukt", "error": str(error), "retry_count": 0, "next_retry": time.time() + 60 # Retry na 60 seconden } # Gebruiksvoorbeeld async def main(): agent = WorkflowAutomationAgent() # Start de workflow result = await agent.execute_workflow() print("Workflow Resultaat:", result) # Voorbeeld van conditionele vertakking if result["status"] == "succesvol": print("Workflow voltooid - analyseer resultaten") else: print("Workflow mislukt - initieer retry") # Uitvoeren if __name__ == "__main__": asyncio.run(main()) ``` ## Belangrijke Kenmerken: 1. **Taakafhankelijkheden**: Elke taak checkt of vereiste data aanwezig is 2. **Conditionele Vertakkingen**: `if "succesvol" in result1` bepaalt workflow voortgang 3. **Asynchrone Uitvoering**: `asyncio` voor niet-blokkerende operaties 4. **Vertraagde Uitvoering**: `await asyncio.sleep(300)` voor prestatiebewaking 5. **Fouttolerantie**: Retry-logica met exponentiële backoff (max 3 pogingen) 6. **State Management**: Foutafhandeling met recovery informatie ## Vereisten: ```bash pip install langchain openai tenacity asyncio ``` ## Aanvullende Optimalisaties: ```python # Voor parallelle uitvoering van onafhankelijke taken async def parallel_execution(self): task1 = asyncio.create_task(self.agent.run("Taak 1")) task2 = asyncio.create_task(self.agent.run("Taak 2")) results = await asyncio.gather(task1, task2, return_exceptions=True) # Error handling voor parallelle taken for i, result in enumerate(results): if isinstance(result, Exception): print(f"Taak {i+1} mislukt: {result}") ``` Deze implementatie biedt een robuuste basis voor workflow automatisering met alle gevraagde functionaliteiten.
error: Generation failed [deepseek] prompt_id=2245 locale=en err=DeepSeek connection/timeout | Generation failed [deepseek] prompt_id=2245 locale=pt err=DeepSeek connection/timeout