from pydantic import BaseModel from typing import List, Optional from mcp.server.fastmcp import FastMCP import json # 1. Création du serveur MCP mcp = FastMCP("Serveur_API_Tasks", host="0.0.0.0", port=9000) # --- Modèles de données --- class TaskCreate(BaseModel): title: str description: Optional[str] = None completed: bool = False class Task(TaskCreate): id: int # --- Base de données en mémoire --- tasks_db = [] current_id = 1 # --- Routes CRUD --- # 1. CREATE : Ajouter une nouvelle tâche @mcp.tool() #@app.post("/tasks", response_model=Task, status_code=201) def create_task(task: TaskCreate): global current_id # Création de la tâche avec l'ID actuel new_task = Task(id=current_id, **task.model_dump()) tasks_db.append(new_task) current_id += 1 return new_task # 2. READ : Récupérer toutes les tâches #@app.get("/tasks", response_model=List[Task]) @mcp.tool() def get_tasks(): return tasks_db # 3. READ : Récupérer une tâche spécifique via son ID #@app.get("/tasks/{task_id}", response_model=Task) @mcp.tool() def get_task(task_id: int): for task in tasks_db: if task.id == task_id: return task raise HTTPException(status_code=404, detail="Tâche non trouvée") # 4. UPDATE : Mettre à jour une tâche #@app.put("/tasks/{task_id}", response_model=Task) @mcp.tool() def update_task(task_id: int, updated_task: TaskCreate): for index, task in enumerate(tasks_db): if task.id == task_id: # Remplacement de l'ancienne tâche par la nouvelle tasks_db[index] = Task(id=task_id, **updated_task.model_dump()) return tasks_db[index] raise HTTPException(status_code=404, detail="Tâche non trouvée") # 5. DELETE : Supprimer une tâche #@app.delete("/tasks/{task_id}", status_code=204) @mcp.tool() def delete_task(task_id: int): for index, task in enumerate(tasks_db): if task.id == task_id: tasks_db.pop(index) return raise HTTPException(status_code=404, detail="Tâche non trouvée") print("🚀 Démarrage du serveur MCP sur le port 9000...") # transport="sse" active le mode serveur web au lieu du mode terminal # host="0.0.0.0" permet d'écouter toutes les connexions (indispensable si tu es sur une VM ou Docker) mcp.run(transport="sse")