import os from dotenv import load_dotenv import sys import asyncio from typing import Annotated, Literal, TypedDict from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.graph import StateGraph, END, START from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage from langgraph.prebuilt import create_react_agent load_dotenv() # 1. Définition de l'état du graphe (ce que les agents se partagent) class AgentState(TypedDict): messages: Annotated[list[BaseMessage], "L'historique des échanges"] # 2. Création des Spécialistes llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash") # Agent Verlant agent_verlant = create_react_agent( llm, tools=[], prompt="Tu es un expert en verlan. Ta seule mission est de traduire les textes qu'on te donne en verlan (ex: bizarre -> zarbi). Sois bref." ) # Agent Argo agent_argo = create_react_agent( llm, tools=[], prompt="Tu es un expert en argot parisien. Traduis les textes en utilisant un langage très fleuri et argotique. Sois bref." ) # 3. Logique du Coordinateur (Le Routeur) def routeur_coordinateur(state: AgentState) -> Literal["agent_verlant", "agent_argo", "__end__"]: dernier_message = state["messages"][-1].content.lower() if "verlan" in dernier_message: return "agent_verlant" elif "argo" in dernier_message or "argot" in dernier_message: return "agent_argo" else: return "__end__" # Il répond directement s'il ne comprend pas la consigne # 4. Construction du Graphe workflow = StateGraph(AgentState) # Ajout des nœuds #workflow.add_node("agent_verlant", lambda state: agent_verlant.ainvoke(state)) #workflow.add_node("agent_argo", lambda state: agent_argo.ainvoke(state)) # ✅ LangGraph va automatiquement comprendre qu'il doit faire un "await" sur ces agents workflow.add_node("agent_verlant", agent_verlant) workflow.add_node("agent_argo", agent_argo) # Logique de départ workflow.add_conditional_edges(START, routeur_coordinateur) # Une fois qu'un spécialiste a répondu, on va vers la fin workflow.add_edge("agent_verlant", END) workflow.add_edge("agent_argo", END) # Compilation app = workflow.compile() # --- TEST --- async def main(): consigne = "Dis moi que cette fête est folle en verlan" async for event in app.astream({"messages": [HumanMessage(content=consigne)]}): print(event) if __name__ == "__main__": asyncio.run(main())