langgraph-fundamentals

Par langchain-ai · langchain-skills

INVOQUER CETTE SKILL lors de l'écriture de tout code LangGraph. Couvre StateGraph, les schémas d'état, les nœuds, les arêtes, Command, Send, invoke, le streaming et la gestion des erreurs.

npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-fundamentals

<overview> LangGraph modélise les workflows d'agent comme des graphes orientés :

  • StateGraph : classe principale pour construire des graphes stateful
  • Nœuds : fonctions qui effectuent le travail et mettent à jour l'état
  • Arêtes : définissent l'ordre d'exécution (statique ou conditionnel)
  • START/END : nœuds spéciaux marquant les points d'entrée et de sortie
  • State with Reducers : contrôlent la fusion des mises à jour d'état

Les graphes doivent être compile()s avant l'exécution. </overview>

<design-methodology>

Concevoir une application LangGraph

Suivez ces 5 étapes lors de la création d'un nouveau graphe :

  1. Cartographiez les étapes discrètes — esquissez un organigramme de votre workflow. Chaque étape devient un nœud.
  2. Identifiez ce que chaque étape fait — catégorisez les nœuds : étape LLM, étape données, étape action ou étape entrée utilisateur. Pour chacun, déterminez le contexte statique (prompt), le contexte dynamique (depuis l'état), la stratégie de réessai et le résultat souhaité.
  3. Concevez votre état — l'état est la mémoire partagée de tous les nœuds. Stockez les données brutes, formatez les prompts à la demande dans les nœuds.
  4. Construisez vos nœuds — implémentez chaque étape comme une fonction qui accepte l'état et retourne des mises à jour partielles.
  5. Câblez-les ensemble — connectez les nœuds avec des arêtes, ajoutez un routage conditionnel, compilez avec un checkpointer si nécessaire.

</design-methodology>

<when-to-use-langgraph>

Utiliser LangGraph quand Utiliser des alternatives quand
Besoin de contrôle fin de l'orchestration d'agent Prototypage rapide → agents LangChain
Construction de workflows complexes avec branchement/boucles Workflows simples sans état → LangChain direct
Requiert boucle humaine, persistance Fonctionnalités clés en main → Deep Agents

</when-to-use-langgraph>


Gestion de l'état

<state-update-strategies>

Besoin Solution Exemple
Remplacer la valeur Pas de reducer (par défaut) Champs simples comme les compteurs
Ajouter à une liste Reducer (operator.add / concat) Historique de messages, logs
Logique personnalisée Fonction reducer personnalisée Fusion complexe

</state-update-strategies>

<ex-state-with-reducer> <python> Définissez le schéma d'état avec des reducers pour accumuler les listes et sommer les entiers.

from typing_extensions import TypedDict, Annotated
import operator

class State(TypedDict):
    name: str  # Par défaut : remplace à la mise à jour
    messages: Annotated[list, operator.add]  # Ajoute à la liste
    total: Annotated[int, operator.add]  # Somme les entiers

</python> <typescript> Utilisez StateSchema avec ReducedValue pour accumuler les tableaux.

import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  name: z.string(),  // Par défaut : remplace
  messages: MessagesValue,  // Intégré pour les messages
  items: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (current, update) => current.concat(update) }
  ),
});

</typescript> </ex-state-with-reducer>

<fix-forgot-reducer-for-list> <python> Sans reducer, retourner une liste remplace les valeurs précédentes.

# MAUVAIS : La liste sera REMPLACÉE
class State(TypedDict):
    messages: list  # Pas de reducer !

# Nœud 1 retourne : {"messages": ["A"]}
# Nœud 2 retourne : {"messages": ["B"]}
# Résultat final : {"messages": ["B"]}  # "A" est PERDU !

# CORRECT : Utilisez Annotated avec operator.add
from typing import Annotated
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]
# Résultat final : {"messages": ["A", "B"]}

</python> <typescript> Sans ReducedValue, les tableaux sont remplacés, non ajoutés.

// MAUVAIS : Le tableau sera remplacé
const State = new StateSchema({
  items: z.array(z.string()),  // Pas de reducer !
});
// Nœud 1 : { items: ["A"] }, Nœud 2 : { items: ["B"] }
// Résultat final : { items: ["B"] }  // A est perdu !

// CORRECT : Utilisez ReducedValue
const State = new StateSchema({
  items: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (current, update) => current.concat(update) }
  ),
});
// Résultat final : { items: ["A", "B"] }

</typescript> </fix-forgot-reducer-for-list>

<fix-state-must-return-dict> <python> Les nœuds doivent retourner des mises à jour partielles, pas muter et retourner l'état complet.

# MAUVAIS : Retourner l'objet d'état entier
def my_node(state: State) -> State:
    state["field"] = "updated"
    return state  # Ne pas muter et retourner !

# CORRECT : Retourner un dict avec seulement les mises à jour
def my_node(state: State) -> dict:
    return {"field": "updated"}

</python> <typescript> Retournez uniquement les mises à jour partielles, pas l'objet d'état complet.

// MAUVAIS : Retourner l'état entier
const myNode = async (state: typeof State.State) => {
  state.field = "updated";
  return state;  // Ne faites pas cela !
};

// CORRECT : Retourner les mises à jour partielles
const myNode = async (state: typeof State.State) => {
  return { field: "updated" };
};

</typescript> </fix-state-must-return-dict>


Nœuds

<node-function-signatures>

Les fonctions de nœud acceptent ces arguments :

<python>

Signature Quand l'utiliser
def node(state: State) Nœuds simples qui nécessitent seulement l'état
def node(state: State, config: RunnableConfig) Besoin de thread_id, tags ou valeurs configurables
def node(state: State, runtime: Runtime[Context]) Besoin du contexte runtime, du store ou du stream_writer
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime

def plain_node(state: State):
    return {"results": "done"}

def node_with_config(state: State, config: RunnableConfig):
    thread_id = config["configurable"]["thread_id"]
    return {"results": f"Thread: {thread_id}"}

def node_with_runtime(state: State, runtime: Runtime[Context]):
    user_id = runtime.context.user_id
    return {"results": f"User: {user_id}"}

</python> <typescript>

Signature Quand l'utiliser
(state) => {...} Nœuds simples qui nécessitent seulement l'état
(state, config) => {...} Besoin de thread_id, tags ou valeurs configurables
import { GraphNode, StateSchema } from "@langchain/langgraph";

const plainNode: GraphNode<typeof State> = (state) => {
  return { results: "done" };
};

const nodeWithConfig: GraphNode<typeof State> = (state, config) => {
  const threadId = config?.configurable?.thread_id;
  return { results: `Thread: ${threadId}` };
};

</typescript>

</node-function-signatures>


Arêtes

<edge-type-selection>

Besoin Type d'arête Quand l'utiliser
Toujours aller au même nœud add_edge() Flux fixe, déterministe
Router en fonction de l'état add_conditional_edges() Branchement dynamique
Mettre à jour l'état ET router Command Combiner la logique dans un seul nœud
Fan-out vers plusieurs nœuds Send Traitement parallèle avec entrées dynamiques

</edge-type-selection>

<ex-basic-graph> <python> Graphe simple à deux nœuds avec arêtes linéaires.

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class State(TypedDict):
    input: str
    output: str

def process_input(state: State) -> dict:
    return {"output": f"Processed: {state['input']}"}

def finalize(state: State) -> dict:
    return {"output": state["output"].upper()}

graph = (
    StateGraph(State)
    .add_node("process", process_input)
    .add_node("finalize", finalize)
    .add_edge(START, "process")
    .add_edge("process", "finalize")
    .add_edge("finalize", END)
    .compile()
)

result = graph.invoke({"input": "hello"})
print(result["output"])  # "PROCESSED: HELLO"

</python> <typescript> Chaînez les nœuds avec addEdge et compilez avant d'invoquer.

import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  input: z.string(),
  output: z.string().default(""),
});

const processInput = async (state: typeof State.State) => {
  return { output: `Processed: ${state.input}` };
};

const finalize = async (state: typeof State.State) => {
  return { output: state.output.toUpperCase() };
};

const graph = new StateGraph(State)
  .addNode("process", processInput)
  .addNode("finalize", finalize)
  .addEdge(START, "process")
  .addEdge("process", "finalize")
  .addEdge("finalize", END)
  .compile();

const result = await graph.invoke({ input: "hello" });
console.log(result.output);  // "PROCESSED: HELLO"

</typescript> </ex-basic-graph>

<ex-conditional-edges> <python> Router vers différents nœuds en fonction de l'état avec des arêtes conditionnelles.

from typing import Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    query: str
    route: str
    result: str

def classify(state: State) -> dict:
    if "weather" in state["query"].lower():
        return {"route": "weather"}
    return {"route": "general"}

def route_query(state: State) -> Literal["weather", "general"]:
    return state["route"]

graph = (
    StateGraph(State)
    .add_node("classify", classify)
    .add_node("weather", lambda s: {"result": "Sunny, 72F"})
    .add_node("general", lambda s: {"result": "General response"})
    .add_edge(START, "classify")
    .add_conditional_edges("classify", route_query, ["weather", "general"])
    .add_edge("weather", END)
    .add_edge("general", END)
    .compile()
)

</python> <typescript> addConditionalEdges route en fonction de la valeur retournée par la fonction.

import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  query: z.string(),
  route: z.string().default(""),
  result: z.string().default(""),
});

const classify = async (state: typeof State.State) => {
  if (state.query.toLowerCase().includes("weather")) {
    return { route: "weather" };
  }
  return { route: "general" };
};

const routeQuery = (state: typeof State.State) => state.route;

const graph = new StateGraph(State)
  .addNode("classify", classify)
  .addNode("weather", async () => ({ result: "Sunny, 72F" }))
  .addNode("general", async () => ({ result: "General response" }))
  .addEdge(START, "classify")
  .addConditionalEdges("classify", routeQuery, ["weather", "general"])
  .addEdge("weather", END)
  .addEdge("general", END)
  .compile();

</typescript> </ex-conditional-edges>


Command

Command combine les mises à jour d'état et le routage dans une seule valeur de retour. Champs :

  • update : mises à jour d'état à appliquer (comme retourner un dict depuis un nœud)
  • goto : nom(s) du/des nœud(s) vers lesquels naviguer ensuite
  • resume : valeur à reprendre après interrupt() — voir la skill boucle humaine

<ex-command-state-and-routing> <python> Command vous permet de mettre à jour l'état ET de choisir le nœud suivant en un seul retour.

from langgraph.types import Command
from typing import Literal

class State(TypedDict):
    count: int
    result: str

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    """Mettre à jour l'état ET décider du nœud suivant en un seul retour."""
    new_count = state["count"] + 1
    if new_count > 5:
        return Command(update={"count": new_count}, goto="node_c")
    return Command(update={"count": new_count}, goto="node_b")

graph = (
    StateGraph(State)
    .add_node("node_a", node_a)
    .add_node("node_b", lambda s: {"result": "B"})
    .add_node("node_c", lambda s: {"result": "C"})
    .add_edge(START, "node_a")
    .add_edge("node_b", END)
    .add_edge("node_c", END)
    .compile()
)

</python> <typescript> Retournez Command avec update et goto pour combiner changement d'état et routage.

import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  count: z.number().default(0),
  result: z.string().default(""),
});

const nodeA = async (state: typeof State.State) => {
  const newCount = state.count + 1;
  if (newCount > 5) {
    return new Command({ update: { count: newCount }, goto: "node_c" });
  }
  return new Command({ update: { count: newCount }, goto: "node_b" });
};

const graph = new StateGraph(State)
  .addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
  .addNode("node_b", async () => ({ result: "B" }))
  .addNode("node_c", async () => ({ result: "C" }))
  .addEdge(START, "node_a")
  .addEdge("node_b", END)
  .addEdge("node_c", END)
  .compile();

</typescript> </ex-command-state-and-routing>

<command-return-type-annotations>

Python : utilisez Command[Literal["node_a", "node_b"]] comme annotation de type de retour pour déclarer les destinations goto valides.

TypeScript : passez { ends: ["node_a", "node_b"] } comme troisième argument à addNode pour déclarer les destinations goto valides.

</command-return-type-annotations>

<warning-command-static-edges>

Attention : Command ajoute uniquement des arêtes dynamiques — les arêtes statiques définies avec add_edge / addEdge s'exécutent quand même. Si node_a retourne Command(goto="node_c") et que vous avez aussi graph.add_edge("node_a", "node_b"), les deux node_b et node_c s'exécuteront.

</warning-command-static-edges>


API Send

Fan-out avec Send : retournez [Send("worker", {...})] depuis une arête conditionnelle pour créer des workers parallèles. Nécessite un reducer sur le champ résultats.

<ex-orchestrator-worker> <python> Fan out les tâches vers des workers parallèles en utilisant l'API Send et agrégez les résultats.

from langgraph.types import Send
from typing import Annotated
import operator

class OrchestratorState(TypedDict):
    tasks: list[str]
    results: Annotated[list, operator.add]
    summary: str

def orchestrator(state: OrchestratorState):
    """Fan out les tâches vers les workers."""
    return [Send("worker", {"task": task}) for task in state["tasks"]]

def worker(state: dict) -> dict:
    return {"results": [f"Completed: {state['task']}"]}

def synthesize(state: OrchestratorState) -> dict:
    return {"summary": f"Processed {len(state['results'])} tasks"}

graph = (
    StateGraph(OrchestratorState)
    .add_node("worker", worker)
    .add_node("synthesize", synthesize)
    .add_conditional_edges(START, orchestrator, ["worker"])
    .add_edge("worker", "synthesize")
    .add_edge("synthesize", END)
    .compile()
)

result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})

</python> <typescript> Fan out les tâches vers des workers parallèles en utilisant l'API Send et agrégez les résultats.

import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  tasks: z.array(z.string()),
  results: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (curr, upd) => curr.concat(upd) }
  ),
  summary: z.string().default(""),
});

const orchestrator = (state: typeof State.State) => {
  return state.tasks.map((task) => new Send("worker", { task }));
};

const worker = async (state: { task: string }) => {
  return { results: [`Completed: ${state.task}`] };
};

const synthesize = async (state: typeof State.State) => {
  return { summary: `Processed ${state.results.length} tasks` };
};

const graph = new StateGraph(State)
  .addNode("worker", worker)
  .addNode("synthesize", synthesize)
  .addConditionalEdges(START, orchestrator, ["worker"])
  .addEdge("worker", "synthesize")
  .addEdge("synthesize", END)
  .compile();

</typescript> </ex-orchestrator-worker>

<fix-send-accumulator> <python> Utilisez un reducer pour accumuler les résultats des workers parallèles (sinon le dernier worker remplace).

# MAUVAIS : Pas de reducer - le dernier worker remplace
class State(TypedDict):
    results: list

# CORRECT
class State(TypedDict):
    results: Annotated[list, operator.add]  # Accumule

</python> <typescript> Utilisez ReducedValue pour accumuler les résultats des workers parallèles.

// MAUVAIS : Pas de reducer
const State = new StateSchema({ results: z.array(z.string()) });

// CORRECT
const State = new StateSchema({
  results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});

</typescript> </fix-send-accumulator>


Exécuter les graphes : Invoke et Stream

<invoke-basics>

Appelez graph.invoke(input, config) pour exécuter un graphe jusqu'à son terme et retourner l'état final.

<python>

result = graph.invoke({"input": "hello"})
# Avec config (pour la persistance, tags, etc.)
result = graph.invoke({"input": "hello"}, {"configurable": {"thread_id": "1"}})

</python> <typescript>

const result = await graph.invoke({ input: "hello" });
// Avec config
const result = await graph.invoke({ input: "hello" }, { configurable: { thread_id: "1" } });

</typescript>

</invoke-basics>

<stream-mode-selection>

Mode Ce qu'il streame Cas d'usage
values État complet après chaque étape Surveiller l'état complet
updates Deltas d'état Suivre les mises à jour incrémentielles
messages Tokens LLM + métadonnées IUs de chat
custom Données définies par l'utilisateur Indicateurs de progression

</stream-mode-selection>

<ex-stream-llm-tokens> <python> Streamez les tokens LLM en temps réel pour l'affichage dans l'UI de chat.

for chunk in graph.stream(
    {"messages": [HumanMessage("Hello")]},
    stream_mode="messages"
):
    token, metadata = chunk
    if hasattr(token, "content"):
        print(token.content, end="", flush=True)

</python> <typescript> Streamez les tokens LLM en temps réel pour l'affichage dans l'UI de chat.

for await (const chunk of graph.stream(
  { messages: [new HumanMessage("Hello")] },
  { streamMode: "messages" }
)) {
  const [token, metadata] = chunk;
  if (token.content) {
    process.stdout.write(token.content);
  }
}

</typescript> </ex-stream-llm-tokens>

<ex-stream-custom-data> <python> Émettez des mises à jour de progression personnalisées depuis les nœuds en utilisant le stream writer.

from langgraph.config import get_stream_writer

def my_node(state):
    writer = get_stream_writer()
    writer("Processing step 1...")
    # Effectuer le travail
    writer("Complete!")
    return {"result": "done"}

for chunk in graph.stream({"data": "test"}, stream_mode="custom"):
    print(chunk)

</python> <typescript> Émettez des mises à jour de progression personnalisées depuis les nœuds en utilisant le stream writer.

import { getWriter } from "@langchain/langgraph";

const myNode = async (state: typeof State.State) => {
  const writer = getWriter();
  writer("Processing step 1...");
  // Effectuer le travail
  writer("Complete!");
  return { result: "done" };
};

for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
  console.log(chunk);
}

</typescript> </ex-stream-custom-data>


Gestion des erreurs

Associez le type d'erreur au bon gestionnaire :

<error-handling-table>

Type d'erreur Qui corrige Stratégie Exemple
Transitoire (réseau, rate limits) Système RetryPolicy(max_attempts=3) add_node(..., retry_policy=...)
Récupérable par LLM (défaillances d'outils) LLM ToolNode(tools, handle_tool_errors=True) Erreur retournée comme ToolMessage
Correctible par utilisateur (infos manquantes) Humain interrupt({"message": ...}) Collecter les données manquantes (voir skill HITL)
Inattendue Développeur Laisser remonter raise

</error-handling-table>

<ex-retry-policy> <python> Utilisez RetryPolicy pour les erreurs transitoires (problèmes réseau, rate limits).

from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

</python> <typescript> Utilisez retryPolicy pour les erreurs transitoires.

workflow.addNode(
  "searchDocumentation",
  searchDocumentation,
  {
    retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
  },
);

</typescript> </ex-retry-policy>

<ex-tool-node-error-handling> <python> Utilisez ToolNode depuis langgraph.prebuilt pour gérer l'exécution des outils et les erreurs. Quand handle_tool_errors=True, les erreurs sont retournées comme ToolMessages pour que le LLM puisse récupérer.

from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools, handle_tool_errors=True)

workflow.add_node("tools", tool_node)

</python> <typescript> Utilisez ToolNode depuis @langchain/langgraph/prebuilt pour gérer l'exécution des outils et les erreurs. Quand handleToolErrors est true, les erreurs sont retournées comme ToolMessages pour que le LLM puisse récupérer.

import { ToolNode } from "@langchain/langgraph/prebuilt";

const toolNode = new ToolNode(tools, { handleToolErrors: true });

workflow.addNode("tools", toolNode);

</typescript> </ex-tool-node-error-handling>


Corrections courantes

<fix-compile-before-execution> <python> Vous devez compiler() pour obtenir un graphe exécutable.

# MAUVAIS
builder.invoke({"input": "test"})  # AttributeError !

# CORRECT
graph = builder.compile()
graph.invoke({"input": "test"})

</python> <typescript> Vous devez compiler() pour obtenir un graphe exécutable.

// MAUVAIS
await builder.invoke({ input: "test" });

// CORRECT
const graph = builder.compile();
await graph.invoke({ input: "test" });

</typescript> </fix-compile-before-execution>

<fix-infinite-loop-needs-exit> <python> Fournissez un chemin conditionnel vers END pour éviter les boucles infinies.

# MAUVAIS : Boucle à l'infini
builder.add_edge("node_a", "node_b")
builder.add_edge("node_b", "node_a")

# CORRECT
def should_continue(state):
    return END if state["count"] > 10 else "node_b"
builder.add_conditional_edges("node_a", should_continue)

</python> <typescript> Utilisez des arêtes conditionnelles avec retour END pour casser les boucles.

// MAUVAIS : Boucle à l'infini
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");

// CORRECT
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");

</typescript> </fix-infinite-loop-needs-exit>

<fix-common-mistakes> Autres erreurs courantes :

# Le router doit retourner les noms des nœuds qui existent dans le graphe
builder.add_node("my_node", func)  # Ajoutez le nœud AVANT de le référencer dans les arêtes
builder.add_conditional_edges("node_a", router, ["my_node"])

# Le type de retour de Command a besoin de Literal pour les destinations de routage (Python)
def node_a(state) -> Command[Literal["node_b", "node_c"]]:
    return Command(goto="node_b")

# START est entry-only - on ne peut pas router vers lui
builder.add_edge("node_a", START)  # MAUVAIS !
builder.add_edge("node_a", "entry")  # Utilisez un nœud d'entrée nommé à la place

# Le reducer attend des types correspondants
return {"items": ["item"]}  # Liste pour un reducer de liste, pas une chaîne
// Toujours awaiter graph.invoke() - il retourne une Promise
const result = await graph.invoke({ input: "test" });

// Les nœuds TS Command ont besoin de { ends } pour déclarer les destinations de routage
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });

</fix-common-mistakes>

<boundaries>

Ce que vous NE DEVEZ PAS faire

  • Muter l'état directement — toujours retourner des dicts de mises à jour partielles depuis les nœuds
  • Router vers START — c'est entry-only; utilisez un nœud nommé à la place
  • Oublier les reducers sur les champs de liste — sans eux, le dernier écrit gagne
  • Mélanger des arêtes statiques avec Command goto sans comprendre que les deux s'exécuteront </boundaries>

Skills similaires