<overview> LangGraph modélise les workflows d'agent comme des graphes orientés :
- StateGraph : classe principale pour construire des graphes stateful
- Nœuds : fonctions qui effectuent le travail et mettent à jour l'état
- Arêtes : définissent l'ordre d'exécution (statique ou conditionnel)
- START/END : nœuds spéciaux marquant les points d'entrée et de sortie
- State with Reducers : contrôlent la fusion des mises à jour d'état
Les graphes doivent être compile()s avant l'exécution.
</overview>
<design-methodology>
Concevoir une application LangGraph
Suivez ces 5 étapes lors de la création d'un nouveau graphe :
- Cartographiez les étapes discrètes — esquissez un organigramme de votre workflow. Chaque étape devient un nœud.
- Identifiez ce que chaque étape fait — catégorisez les nœuds : étape LLM, étape données, étape action ou étape entrée utilisateur. Pour chacun, déterminez le contexte statique (prompt), le contexte dynamique (depuis l'état), la stratégie de réessai et le résultat souhaité.
- Concevez votre état — l'état est la mémoire partagée de tous les nœuds. Stockez les données brutes, formatez les prompts à la demande dans les nœuds.
- Construisez vos nœuds — implémentez chaque étape comme une fonction qui accepte l'état et retourne des mises à jour partielles.
- Câblez-les ensemble — connectez les nœuds avec des arêtes, ajoutez un routage conditionnel, compilez avec un checkpointer si nécessaire.
</design-methodology>
<when-to-use-langgraph>
| Utiliser LangGraph quand | Utiliser des alternatives quand |
|---|---|
| Besoin de contrôle fin de l'orchestration d'agent | Prototypage rapide → agents LangChain |
| Construction de workflows complexes avec branchement/boucles | Workflows simples sans état → LangChain direct |
| Requiert boucle humaine, persistance | Fonctionnalités clés en main → Deep Agents |
</when-to-use-langgraph>
Gestion de l'état
<state-update-strategies>
| Besoin | Solution | Exemple |
|---|---|---|
| Remplacer la valeur | Pas de reducer (par défaut) | Champs simples comme les compteurs |
| Ajouter à une liste | Reducer (operator.add / concat) | Historique de messages, logs |
| Logique personnalisée | Fonction reducer personnalisée | Fusion complexe |
</state-update-strategies>
<ex-state-with-reducer> <python> Définissez le schéma d'état avec des reducers pour accumuler les listes et sommer les entiers.
from typing_extensions import TypedDict, Annotated
import operator
class State(TypedDict):
name: str # Par défaut : remplace à la mise à jour
messages: Annotated[list, operator.add] # Ajoute à la liste
total: Annotated[int, operator.add] # Somme les entiers
</python> <typescript> Utilisez StateSchema avec ReducedValue pour accumuler les tableaux.
import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
name: z.string(), // Par défaut : remplace
messages: MessagesValue, // Intégré pour les messages
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
</typescript> </ex-state-with-reducer>
<fix-forgot-reducer-for-list> <python> Sans reducer, retourner une liste remplace les valeurs précédentes.
# MAUVAIS : La liste sera REMPLACÉE
class State(TypedDict):
messages: list # Pas de reducer !
# Nœud 1 retourne : {"messages": ["A"]}
# Nœud 2 retourne : {"messages": ["B"]}
# Résultat final : {"messages": ["B"]} # "A" est PERDU !
# CORRECT : Utilisez Annotated avec operator.add
from typing import Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
# Résultat final : {"messages": ["A", "B"]}
</python> <typescript> Sans ReducedValue, les tableaux sont remplacés, non ajoutés.
// MAUVAIS : Le tableau sera remplacé
const State = new StateSchema({
items: z.array(z.string()), // Pas de reducer !
});
// Nœud 1 : { items: ["A"] }, Nœud 2 : { items: ["B"] }
// Résultat final : { items: ["B"] } // A est perdu !
// CORRECT : Utilisez ReducedValue
const State = new StateSchema({
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
// Résultat final : { items: ["A", "B"] }
</typescript> </fix-forgot-reducer-for-list>
<fix-state-must-return-dict> <python> Les nœuds doivent retourner des mises à jour partielles, pas muter et retourner l'état complet.
# MAUVAIS : Retourner l'objet d'état entier
def my_node(state: State) -> State:
state["field"] = "updated"
return state # Ne pas muter et retourner !
# CORRECT : Retourner un dict avec seulement les mises à jour
def my_node(state: State) -> dict:
return {"field": "updated"}
</python> <typescript> Retournez uniquement les mises à jour partielles, pas l'objet d'état complet.
// MAUVAIS : Retourner l'état entier
const myNode = async (state: typeof State.State) => {
state.field = "updated";
return state; // Ne faites pas cela !
};
// CORRECT : Retourner les mises à jour partielles
const myNode = async (state: typeof State.State) => {
return { field: "updated" };
};
</typescript> </fix-state-must-return-dict>
Nœuds
<node-function-signatures>
Les fonctions de nœud acceptent ces arguments :
<python>
| Signature | Quand l'utiliser |
|---|---|
def node(state: State) |
Nœuds simples qui nécessitent seulement l'état |
def node(state: State, config: RunnableConfig) |
Besoin de thread_id, tags ou valeurs configurables |
def node(state: State, runtime: Runtime[Context]) |
Besoin du contexte runtime, du store ou du stream_writer |
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
def plain_node(state: State):
return {"results": "done"}
def node_with_config(state: State, config: RunnableConfig):
thread_id = config["configurable"]["thread_id"]
return {"results": f"Thread: {thread_id}"}
def node_with_runtime(state: State, runtime: Runtime[Context]):
user_id = runtime.context.user_id
return {"results": f"User: {user_id}"}
</python> <typescript>
| Signature | Quand l'utiliser |
|---|---|
(state) => {...} |
Nœuds simples qui nécessitent seulement l'état |
(state, config) => {...} |
Besoin de thread_id, tags ou valeurs configurables |
import { GraphNode, StateSchema } from "@langchain/langgraph";
const plainNode: GraphNode<typeof State> = (state) => {
return { results: "done" };
};
const nodeWithConfig: GraphNode<typeof State> = (state, config) => {
const threadId = config?.configurable?.thread_id;
return { results: `Thread: ${threadId}` };
};
</typescript>
</node-function-signatures>
Arêtes
<edge-type-selection>
| Besoin | Type d'arête | Quand l'utiliser |
|---|---|---|
| Toujours aller au même nœud | add_edge() |
Flux fixe, déterministe |
| Router en fonction de l'état | add_conditional_edges() |
Branchement dynamique |
| Mettre à jour l'état ET router | Command |
Combiner la logique dans un seul nœud |
| Fan-out vers plusieurs nœuds | Send |
Traitement parallèle avec entrées dynamiques |
</edge-type-selection>
<ex-basic-graph> <python> Graphe simple à deux nœuds avec arêtes linéaires.
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class State(TypedDict):
input: str
output: str
def process_input(state: State) -> dict:
return {"output": f"Processed: {state['input']}"}
def finalize(state: State) -> dict:
return {"output": state["output"].upper()}
graph = (
StateGraph(State)
.add_node("process", process_input)
.add_node("finalize", finalize)
.add_edge(START, "process")
.add_edge("process", "finalize")
.add_edge("finalize", END)
.compile()
)
result = graph.invoke({"input": "hello"})
print(result["output"]) # "PROCESSED: HELLO"
</python> <typescript> Chaînez les nœuds avec addEdge et compilez avant d'invoquer.
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
input: z.string(),
output: z.string().default(""),
});
const processInput = async (state: typeof State.State) => {
return { output: `Processed: ${state.input}` };
};
const finalize = async (state: typeof State.State) => {
return { output: state.output.toUpperCase() };
};
const graph = new StateGraph(State)
.addNode("process", processInput)
.addNode("finalize", finalize)
.addEdge(START, "process")
.addEdge("process", "finalize")
.addEdge("finalize", END)
.compile();
const result = await graph.invoke({ input: "hello" });
console.log(result.output); // "PROCESSED: HELLO"
</typescript> </ex-basic-graph>
<ex-conditional-edges> <python> Router vers différents nœuds en fonction de l'état avec des arêtes conditionnelles.
from typing import Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
query: str
route: str
result: str
def classify(state: State) -> dict:
if "weather" in state["query"].lower():
return {"route": "weather"}
return {"route": "general"}
def route_query(state: State) -> Literal["weather", "general"]:
return state["route"]
graph = (
StateGraph(State)
.add_node("classify", classify)
.add_node("weather", lambda s: {"result": "Sunny, 72F"})
.add_node("general", lambda s: {"result": "General response"})
.add_edge(START, "classify")
.add_conditional_edges("classify", route_query, ["weather", "general"])
.add_edge("weather", END)
.add_edge("general", END)
.compile()
)
</python> <typescript> addConditionalEdges route en fonction de la valeur retournée par la fonction.
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
query: z.string(),
route: z.string().default(""),
result: z.string().default(""),
});
const classify = async (state: typeof State.State) => {
if (state.query.toLowerCase().includes("weather")) {
return { route: "weather" };
}
return { route: "general" };
};
const routeQuery = (state: typeof State.State) => state.route;
const graph = new StateGraph(State)
.addNode("classify", classify)
.addNode("weather", async () => ({ result: "Sunny, 72F" }))
.addNode("general", async () => ({ result: "General response" }))
.addEdge(START, "classify")
.addConditionalEdges("classify", routeQuery, ["weather", "general"])
.addEdge("weather", END)
.addEdge("general", END)
.compile();
</typescript> </ex-conditional-edges>
Command
Command combine les mises à jour d'état et le routage dans une seule valeur de retour. Champs :
update: mises à jour d'état à appliquer (comme retourner un dict depuis un nœud)goto: nom(s) du/des nœud(s) vers lesquels naviguer ensuiteresume: valeur à reprendre aprèsinterrupt()— voir la skill boucle humaine
<ex-command-state-and-routing> <python> Command vous permet de mettre à jour l'état ET de choisir le nœud suivant en un seul retour.
from langgraph.types import Command
from typing import Literal
class State(TypedDict):
count: int
result: str
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
"""Mettre à jour l'état ET décider du nœud suivant en un seul retour."""
new_count = state["count"] + 1
if new_count > 5:
return Command(update={"count": new_count}, goto="node_c")
return Command(update={"count": new_count}, goto="node_b")
graph = (
StateGraph(State)
.add_node("node_a", node_a)
.add_node("node_b", lambda s: {"result": "B"})
.add_node("node_c", lambda s: {"result": "C"})
.add_edge(START, "node_a")
.add_edge("node_b", END)
.add_edge("node_c", END)
.compile()
)
</python> <typescript> Retournez Command avec update et goto pour combiner changement d'état et routage.
import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
count: z.number().default(0),
result: z.string().default(""),
});
const nodeA = async (state: typeof State.State) => {
const newCount = state.count + 1;
if (newCount > 5) {
return new Command({ update: { count: newCount }, goto: "node_c" });
}
return new Command({ update: { count: newCount }, goto: "node_b" });
};
const graph = new StateGraph(State)
.addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
.addNode("node_b", async () => ({ result: "B" }))
.addNode("node_c", async () => ({ result: "C" }))
.addEdge(START, "node_a")
.addEdge("node_b", END)
.addEdge("node_c", END)
.compile();
</typescript> </ex-command-state-and-routing>
<command-return-type-annotations>
Python : utilisez Command[Literal["node_a", "node_b"]] comme annotation de type de retour pour déclarer les destinations goto valides.
TypeScript : passez { ends: ["node_a", "node_b"] } comme troisième argument à addNode pour déclarer les destinations goto valides.
</command-return-type-annotations>
<warning-command-static-edges>
Attention : Command ajoute uniquement des arêtes dynamiques — les arêtes statiques définies avec add_edge / addEdge s'exécutent quand même. Si node_a retourne Command(goto="node_c") et que vous avez aussi graph.add_edge("node_a", "node_b"), les deux node_b et node_c s'exécuteront.
</warning-command-static-edges>
API Send
Fan-out avec Send : retournez [Send("worker", {...})] depuis une arête conditionnelle pour créer des workers parallèles. Nécessite un reducer sur le champ résultats.
<ex-orchestrator-worker> <python> Fan out les tâches vers des workers parallèles en utilisant l'API Send et agrégez les résultats.
from langgraph.types import Send
from typing import Annotated
import operator
class OrchestratorState(TypedDict):
tasks: list[str]
results: Annotated[list, operator.add]
summary: str
def orchestrator(state: OrchestratorState):
"""Fan out les tâches vers les workers."""
return [Send("worker", {"task": task}) for task in state["tasks"]]
def worker(state: dict) -> dict:
return {"results": [f"Completed: {state['task']}"]}
def synthesize(state: OrchestratorState) -> dict:
return {"summary": f"Processed {len(state['results'])} tasks"}
graph = (
StateGraph(OrchestratorState)
.add_node("worker", worker)
.add_node("synthesize", synthesize)
.add_conditional_edges(START, orchestrator, ["worker"])
.add_edge("worker", "synthesize")
.add_edge("synthesize", END)
.compile()
)
result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})
</python> <typescript> Fan out les tâches vers des workers parallèles en utilisant l'API Send et agrégez les résultats.
import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
tasks: z.array(z.string()),
results: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (curr, upd) => curr.concat(upd) }
),
summary: z.string().default(""),
});
const orchestrator = (state: typeof State.State) => {
return state.tasks.map((task) => new Send("worker", { task }));
};
const worker = async (state: { task: string }) => {
return { results: [`Completed: ${state.task}`] };
};
const synthesize = async (state: typeof State.State) => {
return { summary: `Processed ${state.results.length} tasks` };
};
const graph = new StateGraph(State)
.addNode("worker", worker)
.addNode("synthesize", synthesize)
.addConditionalEdges(START, orchestrator, ["worker"])
.addEdge("worker", "synthesize")
.addEdge("synthesize", END)
.compile();
</typescript> </ex-orchestrator-worker>
<fix-send-accumulator> <python> Utilisez un reducer pour accumuler les résultats des workers parallèles (sinon le dernier worker remplace).
# MAUVAIS : Pas de reducer - le dernier worker remplace
class State(TypedDict):
results: list
# CORRECT
class State(TypedDict):
results: Annotated[list, operator.add] # Accumule
</python> <typescript> Utilisez ReducedValue pour accumuler les résultats des workers parallèles.
// MAUVAIS : Pas de reducer
const State = new StateSchema({ results: z.array(z.string()) });
// CORRECT
const State = new StateSchema({
results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});
</typescript> </fix-send-accumulator>
Exécuter les graphes : Invoke et Stream
<invoke-basics>
Appelez graph.invoke(input, config) pour exécuter un graphe jusqu'à son terme et retourner l'état final.
<python>
result = graph.invoke({"input": "hello"})
# Avec config (pour la persistance, tags, etc.)
result = graph.invoke({"input": "hello"}, {"configurable": {"thread_id": "1"}})
</python> <typescript>
const result = await graph.invoke({ input: "hello" });
// Avec config
const result = await graph.invoke({ input: "hello" }, { configurable: { thread_id: "1" } });
</typescript>
</invoke-basics>
<stream-mode-selection>
| Mode | Ce qu'il streame | Cas d'usage |
|---|---|---|
values |
État complet après chaque étape | Surveiller l'état complet |
updates |
Deltas d'état | Suivre les mises à jour incrémentielles |
messages |
Tokens LLM + métadonnées | IUs de chat |
custom |
Données définies par l'utilisateur | Indicateurs de progression |
</stream-mode-selection>
<ex-stream-llm-tokens> <python> Streamez les tokens LLM en temps réel pour l'affichage dans l'UI de chat.
for chunk in graph.stream(
{"messages": [HumanMessage("Hello")]},
stream_mode="messages"
):
token, metadata = chunk
if hasattr(token, "content"):
print(token.content, end="", flush=True)
</python> <typescript> Streamez les tokens LLM en temps réel pour l'affichage dans l'UI de chat.
for await (const chunk of graph.stream(
{ messages: [new HumanMessage("Hello")] },
{ streamMode: "messages" }
)) {
const [token, metadata] = chunk;
if (token.content) {
process.stdout.write(token.content);
}
}
</typescript> </ex-stream-llm-tokens>
<ex-stream-custom-data> <python> Émettez des mises à jour de progression personnalisées depuis les nœuds en utilisant le stream writer.
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer("Processing step 1...")
# Effectuer le travail
writer("Complete!")
return {"result": "done"}
for chunk in graph.stream({"data": "test"}, stream_mode="custom"):
print(chunk)
</python> <typescript> Émettez des mises à jour de progression personnalisées depuis les nœuds en utilisant le stream writer.
import { getWriter } from "@langchain/langgraph";
const myNode = async (state: typeof State.State) => {
const writer = getWriter();
writer("Processing step 1...");
// Effectuer le travail
writer("Complete!");
return { result: "done" };
};
for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
console.log(chunk);
}
</typescript> </ex-stream-custom-data>
Gestion des erreurs
Associez le type d'erreur au bon gestionnaire :
<error-handling-table>
| Type d'erreur | Qui corrige | Stratégie | Exemple |
|---|---|---|---|
| Transitoire (réseau, rate limits) | Système | RetryPolicy(max_attempts=3) |
add_node(..., retry_policy=...) |
| Récupérable par LLM (défaillances d'outils) | LLM | ToolNode(tools, handle_tool_errors=True) |
Erreur retournée comme ToolMessage |
| Correctible par utilisateur (infos manquantes) | Humain | interrupt({"message": ...}) |
Collecter les données manquantes (voir skill HITL) |
| Inattendue | Développeur | Laisser remonter | raise |
</error-handling-table>
<ex-retry-policy> <python> Utilisez RetryPolicy pour les erreurs transitoires (problèmes réseau, rate limits).
from langgraph.types import RetryPolicy
workflow.add_node(
"search_documentation",
search_documentation,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)
</python> <typescript> Utilisez retryPolicy pour les erreurs transitoires.
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
</typescript> </ex-retry-policy>
<ex-tool-node-error-handling> <python> Utilisez ToolNode depuis langgraph.prebuilt pour gérer l'exécution des outils et les erreurs. Quand handle_tool_errors=True, les erreurs sont retournées comme ToolMessages pour que le LLM puisse récupérer.
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools, handle_tool_errors=True)
workflow.add_node("tools", tool_node)
</python> <typescript> Utilisez ToolNode depuis @langchain/langgraph/prebuilt pour gérer l'exécution des outils et les erreurs. Quand handleToolErrors est true, les erreurs sont retournées comme ToolMessages pour que le LLM puisse récupérer.
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools, { handleToolErrors: true });
workflow.addNode("tools", toolNode);
</typescript> </ex-tool-node-error-handling>
Corrections courantes
<fix-compile-before-execution> <python> Vous devez compiler() pour obtenir un graphe exécutable.
# MAUVAIS
builder.invoke({"input": "test"}) # AttributeError !
# CORRECT
graph = builder.compile()
graph.invoke({"input": "test"})
</python> <typescript> Vous devez compiler() pour obtenir un graphe exécutable.
// MAUVAIS
await builder.invoke({ input: "test" });
// CORRECT
const graph = builder.compile();
await graph.invoke({ input: "test" });
</typescript> </fix-compile-before-execution>
<fix-infinite-loop-needs-exit> <python> Fournissez un chemin conditionnel vers END pour éviter les boucles infinies.
# MAUVAIS : Boucle à l'infini
builder.add_edge("node_a", "node_b")
builder.add_edge("node_b", "node_a")
# CORRECT
def should_continue(state):
return END if state["count"] > 10 else "node_b"
builder.add_conditional_edges("node_a", should_continue)
</python> <typescript> Utilisez des arêtes conditionnelles avec retour END pour casser les boucles.
// MAUVAIS : Boucle à l'infini
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");
// CORRECT
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");
</typescript> </fix-infinite-loop-needs-exit>
<fix-common-mistakes> Autres erreurs courantes :
# Le router doit retourner les noms des nœuds qui existent dans le graphe
builder.add_node("my_node", func) # Ajoutez le nœud AVANT de le référencer dans les arêtes
builder.add_conditional_edges("node_a", router, ["my_node"])
# Le type de retour de Command a besoin de Literal pour les destinations de routage (Python)
def node_a(state) -> Command[Literal["node_b", "node_c"]]:
return Command(goto="node_b")
# START est entry-only - on ne peut pas router vers lui
builder.add_edge("node_a", START) # MAUVAIS !
builder.add_edge("node_a", "entry") # Utilisez un nœud d'entrée nommé à la place
# Le reducer attend des types correspondants
return {"items": ["item"]} # Liste pour un reducer de liste, pas une chaîne
// Toujours awaiter graph.invoke() - il retourne une Promise
const result = await graph.invoke({ input: "test" });
// Les nœuds TS Command ont besoin de { ends } pour déclarer les destinations de routage
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
</fix-common-mistakes>
<boundaries>
Ce que vous NE DEVEZ PAS faire
- Muter l'état directement — toujours retourner des dicts de mises à jour partielles depuis les nœuds
- Router vers START — c'est entry-only; utilisez un nœud nommé à la place
- Oublier les reducers sur les champs de liste — sans eux, le dernier écrit gagne
- Mélanger des arêtes statiques avec Command goto sans comprendre que les deux s'exécuteront </boundaries>