agentic with langgraph

This commit is contained in:
2026-04-10 14:56:17 +00:00
parent 8c191e79e8
commit aac7f2ce47
3 changed files with 180 additions and 41 deletions

View File

@@ -1,49 +1,61 @@
import asyncio
import os
from typing import Annotated
from typing_extensions import TypedDict # Plus robuste pour le State
import sys
from dotenv import load_dotenv
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_google_vertexai import ChatVertexAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
# Chargement de la clé API
load_dotenv()
# 1. Configuration du modèle (Syntaxe actuelle)
# Assure-hui que GOOGLE_APPLICATION_CREDENTIALS ou GOOGLE_CLOUD_PROJECT est dans ton .env
llm = ChatVertexAI(
model="gemini-1.5-flash",
temperature=0,
)
async def lancer_agent():
print("🤖 Initialisation de l'Agent Gemini...")
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")
# 2. Définition de l'état
# add_messages permet d'accumuler l'historique au lieu de l'écraser
class State(TypedDict):
messages: Annotated[list, add_messages]
# 3. Le nœud de l'agent
def call_model(state: State):
response = llm.invoke(state["messages"])
# On retourne un dictionnaire qui sera mergé avec l'état actuel
return {"messages": [response]}
# 4. Construction du graphe (Architecture actuelle)
builder = StateGraph(State)
builder.add_node("agent", call_model)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
# Compilation
graph = builder.compile()
# 5. Exécution propre
if __name__ == "__main__":
initial_state = {"messages": [("user", "Salut, tu tournes sur quelle version de Gemini ?")]}
print("🔌 Configuration de la connexion au serveur MCP local...")
# Utilisation de .stream() pour voir ce qui se passe
for chunk in graph.stream(initial_state):
for node, values in chunk.items():
print(f"--- Node: {node} ---")
print(values["messages"][-1].content)
# NOUVELLE SYNTAXE : On passe la configuration directement dans le client
client = MultiServerMCPClient({
"serveur_taches": {
"command": sys.executable, # 🌟 LA MODIFICATION EST ICI
"args": ["serveur_mcp.py"],
"transport": "stdio"
}
})
# On récupère les outils (attention, c'est maintenant une fonction asynchrone qui nécessite 'await')
outils = await client.get_tools()
print(f"✅ Outils détectés : {[outil.name for outil in outils]}")
# 🌟 Création de l'Agent LangGraph
agent = create_react_agent(llm, tools=outils)
# --- LE TEST DE L'AGENT ---
consigne = (
"1. Ajoute la tâche 'Acheter du pain'. "
"2. Ajoute la tâche 'Appeler le client'. "
"3. Fais-moi un résumé de toutes mes tâches actuelles."
)
print(f"\n🗣️ Consigne donnée à l'Agent : {consigne}\n")
print("⏳ L'Agent réfléchit et travaille en autonomie (patientez un peu)...\n")
# On lance l'agent avec notre message
resultat = await agent.ainvoke({
"messages": [HumanMessage(content=consigne)]
})
# On affiche uniquement la réponse finale de l'agent
reponse_finale = resultat["messages"][-1].content
print("✅ RÉPONSE FINALE DE L'AGENT :")
print("-" * 40)
print(reponse_finale)
print("-" * 40)
if __name__ == "__main__":
asyncio.run(lancer_agent())

47
client_mcp_http.py Normal file
View File

@@ -0,0 +1,47 @@
import asyncio
import os
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
# Imports spécifiques pour la connexion réseau (SSE)
from mcp.client.sse import sse_client
from mcp.client.session import ClientSession
from langchain_mcp_adapters.tools import load_mcp_tools
# Chargement de la clé API
load_dotenv()
async def lancer_client_reseau():
print("🤖 Initialisation de Gemini...")
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")
# L'adresse de votre serveur
url = "http://127.0.0.1:9000/sse"
print(f"🔌 Connexion au serveur distant sur {url}...")
# 1. On ouvre le canal de communication réseau
async with sse_client(url) as (read_stream, write_stream):
# 2. On démarre la session MCP sur ce canal
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
# 3. Récupération et conversion des outils au format LangChain
outils = await load_mcp_tools(session)
print(f"✅ Outils détectés : {[outil.name for outil in outils]}")
# 4. On équipe l'IA avec ces outils
llm_avec_outils = llm.bind_tools(outils)
# --- LE TEST ---
question = "Quelles sont les tâches sur ma liste ? Ajoute ensuite la tâche 'Finir le projet réseau'."
print(f"\n🗣️ Question posée : {question}")
reponse = await llm_avec_outils.ainvoke(question)
if reponse.tool_calls:
print("\n⚙️ DÉCISION DE L'IA :")
for outil in reponse.tool_calls:
print(f"- Outil demandé : {outil['name']} | Paramètres : {outil['args']}")
if __name__ == "__main__":
asyncio.run(lancer_client_reseau())

80
serveur_mcp.py Normal file
View File

@@ -0,0 +1,80 @@
from pydantic import BaseModel
from typing import List, Optional
from mcp.server.fastmcp import FastMCP
import json
# 1. Création du serveur MCP
#mcp = FastMCP("Serveur_API_Tasks", host="0.0.0.0", port=9000)
mcp = FastMCP("Serveur_API_Tasks")
# --- Modèles de données ---
class TaskCreate(BaseModel):
title: str
description: Optional[str] = None
completed: bool = False
class Task(TaskCreate):
id: int
# --- Base de données en mémoire ---
tasks_db = []
current_id = 1
# --- Routes CRUD ---
# 1. CREATE : Ajouter une nouvelle tâche
@mcp.tool()
#@app.post("/tasks", response_model=Task, status_code=201)
def create_task(task: TaskCreate):
global current_id
# Création de la tâche avec l'ID actuel
new_task = Task(id=current_id, **task.model_dump())
tasks_db.append(new_task)
current_id += 1
return new_task
# 2. READ : Récupérer toutes les tâches
#@app.get("/tasks", response_model=List[Task])
@mcp.tool()
def get_tasks():
return tasks_db
# 3. READ : Récupérer une tâche spécifique via son ID
#@app.get("/tasks/{task_id}", response_model=Task)
@mcp.tool()
def get_task(task_id: int):
for task in tasks_db:
if task.id == task_id:
return task
raise HTTPException(status_code=404, detail="Tâche non trouvée")
# 4. UPDATE : Mettre à jour une tâche
#@app.put("/tasks/{task_id}", response_model=Task)
@mcp.tool()
def update_task(task_id: int, updated_task: TaskCreate):
for index, task in enumerate(tasks_db):
if task.id == task_id:
# Remplacement de l'ancienne tâche par la nouvelle
tasks_db[index] = Task(id=task_id, **updated_task.model_dump())
return tasks_db[index]
raise HTTPException(status_code=404, detail="Tâche non trouvée")
# 5. DELETE : Supprimer une tâche
#@app.delete("/tasks/{task_id}", status_code=204)
@mcp.tool()
def delete_task(task_id: int):
for index, task in enumerate(tasks_db):
if task.id == task_id:
tasks_db.pop(index)
return
raise HTTPException(status_code=404, detail="Tâche non trouvée")
print("🚀 Démarrage du serveur MCP sur le port 9000...")
# transport="sse" active le mode serveur web au lieu du mode terminal
#mcp.run(transport="sse")
mcp.run()