<overview> La couche de persistence de LangGraph permet une exécution durable en créant des points de contrôle de l'état du graphe :
- Checkpointer : Enregistre/charge l'état du graphe à chaque super-étape
- Thread ID : Identifie les séquences de points de contrôle distinctes (conversations)
- Store : Mémoire multi-threads pour les préférences utilisateur, les faits
Deux types de mémoire :
- Court terme (checkpointer) : Historique de conversation limité au thread
- Long terme (store) : Préférences et faits utilisateur multi-threads </overview>
<checkpointer-selection>
| Checkpointer | Cas d'usage | Prêt pour la production |
|---|---|---|
InMemorySaver |
Tests, développement | Non |
SqliteSaver |
Développement local | Partiel |
PostgresSaver |
Production | Oui |
</checkpointer-selection>
Configuration du Checkpointer
<ex-basic-persistence> <python> Configurez un graphe basique avec création de points de contrôle en mémoire et persistence d'état basée sur les threads.
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict, Annotated
import operator
class State(TypedDict):
messages: Annotated[list, operator.add]
def add_message(state: State) -> dict:
return {"messages": ["Bot response"]}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("respond", add_message)
.add_edge(START, "respond")
.add_edge("respond", END)
.compile(checkpointer=checkpointer) # Passé à la compilation
)
# TOUJOURS fournir thread_id
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hello"]}, config)
print(len(result1["messages"])) # 2
result2 = graph.invoke({"messages": ["How are you?"]}, config)
print(len(result2["messages"])) # 4 (précédent + nouveau)
</python> <typescript> Configurez un graphe basique avec création de points de contrôle en mémoire et persistence d'état basée sur les threads.
import { MemorySaver, StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { HumanMessage } from "@langchain/core/messages";
const State = new StateSchema({ messages: MessagesValue });
const addMessage = async (state: typeof State.State) => {
return { messages: [{ role: "assistant", content: "Bot response" }] };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("respond", addMessage)
.addEdge(START, "respond")
.addEdge("respond", END)
.compile({ checkpointer });
// TOUJOURS fournir thread_id
const config = { configurable: { thread_id: "conversation-1" } };
const result1 = await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
console.log(result1.messages.length); // 2
const result2 = await graph.invoke({ messages: [new HumanMessage("How are you?")] }, config);
console.log(result2.messages.length); // 4 (précédent + nouveau)
</typescript> </ex-basic-persistence>
<ex-production-postgres> <python> Configurez la création de points de contrôle sauvegardés dans PostgreSQL pour les déploiements en production.
import os
from langgraph.checkpoint.postgres import PostgresSaver
# Exécutez une fois lors du déploiement (pas au démarrage de l'application) :
# PostgresSaver.from_conn_string(os.environ["DATABASE_URL"]).setup()
with PostgresSaver.from_conn_string(os.environ["DATABASE_URL"]) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
</python> <typescript> Configurez la création de points de contrôle sauvegardés dans PostgreSQL pour les déploiements en production.
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
// Exécutez une fois lors du déploiement (pas au démarrage de l'application) :
// await PostgresSaver.fromConnString(process.env.DATABASE_URL!).setup();
const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL!);
const graph = builder.compile({ checkpointer });
</typescript> </ex-production-postgres>
Gestion des Threads
<ex-separate-threads> <python> Démontrez l'isolation de l'état entre différents ID de thread.
# Les threads différents maintiennent des états séparés
alice_config = {"configurable": {"thread_id": "user-alice"}}
bob_config = {"configurable": {"thread_id": "user-bob"}}
graph.invoke({"messages": ["Hi from Alice"]}, alice_config)
graph.invoke({"messages": ["Hi from Bob"]}, bob_config)
# L'état d'Alice est isolé de celui de Bob
</python> <typescript> Démontrez l'isolation de l'état entre différents ID de thread.
// Les threads différents maintiennent des états séparés
const aliceConfig = { configurable: { thread_id: "user-alice" } };
const bobConfig = { configurable: { thread_id: "user-bob" } };
await graph.invoke({ messages: [new HumanMessage("Hi from Alice")] }, aliceConfig);
await graph.invoke({ messages: [new HumanMessage("Hi from Bob")] }, bobConfig);
// L'état d'Alice est isolé de celui de Bob
</typescript> </ex-separate-threads>
Historique d'État et Voyage dans le Temps
<ex-resume-from-checkpoint> <python> Voyage dans le temps : parcourez l'historique des points de contrôle et rejouez ou divisez depuis un état passé.
config = {"configurable": {"thread_id": "session-1"}}
result = graph.invoke({"messages": ["start"]}, config)
# Parcourez l'historique des points de contrôle
states = list(graph.get_state_history(config))
# Rejouez depuis un point de contrôle passé
past = states[-2]
result = graph.invoke(None, past.config) # None = reprendre depuis le point de contrôle
# Ou divisez : mettez à jour l'état à un point de contrôle passé, puis reprenez
fork_config = graph.update_state(past.config, {"messages": ["edited"]})
result = graph.invoke(None, fork_config)
</python> <typescript> Voyage dans le temps : parcourez l'historique des points de contrôle et rejouez ou divisez depuis un état passé.
const config = { configurable: { thread_id: "session-1" } };
const result = await graph.invoke({ messages: ["start"] }, config);
// Parcourez l'historique des points de contrôle (itérable asynchrone, collectez en tableau)
const states: Awaited<ReturnType<typeof graph.getState>>[] = [];
for await (const state of graph.getStateHistory(config)) {
states.push(state);
}
// Rejouez depuis un point de contrôle passé
const past = states[states.length - 2];
const replayed = await graph.invoke(null, past.config); // null = reprendre depuis le point de contrôle
// Ou divisez : mettez à jour l'état à un point de contrôle passé, puis reprenez
const forkConfig = await graph.updateState(past.config, { messages: ["edited"] });
const forked = await graph.invoke(null, forkConfig);
</typescript> </ex-resume-from-checkpoint>
<ex-update-state> <python> Mettez à jour manuellement l'état du graphe avant de reprendre l'exécution.
config = {"configurable": {"thread_id": "session-1"}}
# Modifiez l'état avant de reprendre
graph.update_state(config, {"data": "manually_updated"})
# Reprenez avec l'état mis à jour
result = graph.invoke(None, config)
</python> <typescript> Mettez à jour manuellement l'état du graphe avant de reprendre l'exécution.
const config = { configurable: { thread_id: "session-1" } };
// Modifiez l'état avant de reprendre
await graph.updateState(config, { data: "manually_updated" });
// Reprenez avec l'état mis à jour
const result = await graph.invoke(null, config);
</typescript> </ex-update-state>
Portée du Checkpointer du Sous-graphe
Lors de la compilation d'un sous-graphe, le paramètre checkpointer contrôle le comportement de persistence. C'est crucial pour les sous-graphes qui utilisent les interruptions, ont besoin de mémoire multi-tours, ou s'exécutent en parallèle.
<subgraph-checkpointer-scoping-table>
| Fonctionnalité | checkpointer=False |
None (défaut) |
True |
|---|---|---|---|
| Interruptions (HITL) | Non | Oui | Oui |
| Mémoire multi-tours | Non | Non | Oui |
| Appels multiples (sous-graphes différents) | Oui | Oui | Avertissement (conflits d'espace de noms possibles) |
| Appels multiples (même sous-graphe) | Oui | Oui | Non |
| Inspection d'état | Non | Avertissement (invocation actuelle uniquement) | Oui |
</subgraph-checkpointer-scoping-table>
<subgraph-checkpointer-when-to-use>
Quand utiliser chaque mode
checkpointer=False— Le sous-graphe n'a pas besoin d'interruptions ou de persistence. Option la plus simple, aucun surcharge de point de contrôle.None(défaut / omettezcheckpointer) — Le sous-graphe a besoin deinterrupt()mais pas de mémoire multi-tours. Chaque invocation commence de zéro mais peut faire une pause/reprendre. L'exécution parallèle fonctionne car chaque invocation obtient un espace de noms unique.checkpointer=True— Le sous-graphe doit se souvenir de l'état entre les invocations (conversations multi-tours). Chaque appel reprend où le dernier s'est arrêté.
</subgraph-checkpointer-when-to-use>
<warning-stateful-subgraphs-parallel>
Avertissement : Les sous-graphes avec état (checkpointer=True) NE SUPPORTENT PAS l'appel de la même instance de sous-graphe plusieurs fois dans un seul nœud — les appels écrivent dans le même espace de noms de point de contrôle et entrent en conflit.
</warning-stateful-subgraphs-parallel>
<ex-subgraph-checkpointer-modes> <python> Choisissez le bon mode de checkpointer pour votre sous-graphe.
# Aucune interruption nécessaire — refusez la création de points de contrôle
subgraph = subgraph_builder.compile(checkpointer=False)
# Besoin d'interruptions mais pas de persistence entre invocations (défaut)
subgraph = subgraph_builder.compile()
# Besoin de persistence entre invocations (avec état)
subgraph = subgraph_builder.compile(checkpointer=True)
</python> <typescript> Choisissez le bon mode de checkpointer pour votre sous-graphe.
// Aucune interruption nécessaire — refusez la création de points de contrôle
const subgraph = subgraphBuilder.compile({ checkpointer: false });
// Besoin d'interruptions mais pas de persistence entre invocations (défaut)
const subgraph = subgraphBuilder.compile();
// Besoin de persistence entre invocations (avec état)
const subgraph = subgraphBuilder.compile({ checkpointer: true });
</typescript> </ex-subgraph-checkpointer-modes>
<parallel-subgraph-namespacing>
Namespacing parallèle du sous-graphe
Quand plusieurs sous-graphes avec état différents s'exécutent en parallèle, enveloppez chacun dans son propre StateGraph avec un nom de nœud unique pour une isolation d'espace de noms stable :
<python>
from langgraph.graph import MessagesState, StateGraph
def create_sub_agent(model, *, name, **kwargs):
"""Enveloppe un agent avec un nom de nœud unique pour l'isolation d'espace de noms."""
agent = create_agent(model=model, name=name, **kwargs)
return (
StateGraph(MessagesState)
.add_node(name, agent) # nom unique -> espace de noms stable
.add_edge("__start__", name)
.compile()
)
fruit_agent = create_sub_agent(
"gpt-4.1-mini", name="fruit_agent",
tools=[fruit_info], prompt="...", checkpointer=True,
)
veggie_agent = create_sub_agent(
"gpt-4.1-mini", name="veggie_agent",
tools=[veggie_info], prompt="...", checkpointer=True,
)
</python> <typescript>
import { StateGraph, StateSchema, MessagesValue, START } from "@langchain/langgraph";
function createSubAgent(model: string, { name, ...kwargs }: { name: string; [key: string]: any }) {
const agent = createAgent({ model, name, ...kwargs });
return new StateGraph(new StateSchema({ messages: MessagesValue }))
.addNode(name, agent) // nom unique -> espace de noms stable
.addEdge(START, name)
.compile();
}
const fruitAgent = createSubAgent("gpt-4.1-mini", {
name: "fruit_agent", tools: [fruitInfo], prompt: "...", checkpointer: true,
});
const veggieAgent = createSubAgent("gpt-4.1-mini", {
name: "veggie_agent", tools: [veggieInfo], prompt: "...", checkpointer: true,
});
</typescript>
Remarque : Les sous-graphes ajoutés comme nœuds (via add_node) obtiennent déjà automatiquement des espaces de noms basés sur les noms et n'ont pas besoin de ce wrapper.
</parallel-subgraph-namespacing>
Mémoire Long Terme (Store)
<ex-long-term-memory-store> <python> Utilisez un Store pour la mémoire multi-threads et partagez les préférences utilisateur entre les conversations.
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# Enregistrez la préférence utilisateur (disponible sur TOUS les threads)
store.put(("alice", "preferences"), "language", {"preference": "short responses"})
# Nœud avec store — accédez via runtime
from langgraph.runtime import Runtime
def respond(state, runtime: Runtime):
prefs = runtime.store.get((state["user_id"], "preferences"), "language")
return {"response": f"Using preference: {prefs.value}"}
# Compilez avec BOTH checkpointer et store
graph = builder.compile(checkpointer=checkpointer, store=store)
# Les deux threads accèdent à la même mémoire long terme
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-1"}})
graph.invoke({"user_id": "alice"}, {"configurable": {"thread_id": "thread-2"}}) # Mêmes préférences !
</python> <typescript> Utilisez un Store pour la mémoire multi-threads et partagez les préférences utilisateur entre les conversations.
import { MemoryStore } from "@langchain/langgraph";
const store = new MemoryStore();
// Enregistrez la préférence utilisateur (disponible sur TOUS les threads)
await store.put(["alice", "preferences"], "language", { preference: "short responses" });
// Nœud avec store — accédez via runtime
const respond = async (state: typeof State.State, runtime: any) => {
const item = await runtime.store?.get(["alice", "preferences"], "language");
return { response: `Using preference: ${item?.value?.preference}` };
};
// Compilez avec BOTH checkpointer et store
const graph = builder.compile({ checkpointer, store });
// Les deux threads accèdent à la même mémoire long terme
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-1" } });
await graph.invoke({ userId: "alice" }, { configurable: { thread_id: "thread-2" } }); // Mêmes préférences !
</typescript> </ex-long-term-memory-store>
<ex-store-operations> <python> Opérations basiques du store : put, get, search, et delete.
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
store.put(("user-123", "facts"), "location", {"city": "San Francisco"}) # Put
item = store.get(("user-123", "facts"), "location") # Get
results = store.search(("user-123", "facts"), filter={"city": "San Francisco"}) # Search
store.delete(("user-123", "facts"), "location") # Delete
</python> </ex-store-operations>
Corrections
<fix-thread-id-required> <python> Fournissez toujours thread_id dans config pour activer la persistence d'état.
# INCORRECT : Pas de thread_id - l'état N'est PAS persisté !
graph.invoke({"messages": ["Hello"]})
graph.invoke({"messages": ["What did I say?"]}) # N'oublie pas !
# CORRECT : Fournissez toujours thread_id
config = {"configurable": {"thread_id": "session-1"}}
graph.invoke({"messages": ["Hello"]}, config)
graph.invoke({"messages": ["What did I say?"]}, config) # Se souvient !
</python> <typescript> Fournissez toujours thread_id dans config pour activer la persistence d'état.
// INCORRECT : Pas de thread_id - l'état N'est PAS persisté !
await graph.invoke({ messages: [new HumanMessage("Hello")] });
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }); // N'oublie pas !
// CORRECT : Fournissez toujours thread_id
const config = { configurable: { thread_id: "session-1" } };
await graph.invoke({ messages: [new HumanMessage("Hello")] }, config);
await graph.invoke({ messages: [new HumanMessage("What did I say?")] }, config); // Se souvient !
</typescript> </fix-thread-id-required>
<fix-inmemory-not-for-production> <python> Utilisez PostgresSaver au lieu de InMemorySaver pour la persistence en production.
# INCORRECT : Données perdues au redémarrage du processus
checkpointer = InMemorySaver() # En mémoire uniquement !
# CORRECT : Utilisez le stockage persistant pour la production
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://...") as checkpointer:
checkpointer.setup() # seulement nécessaire à la première utilisation pour créer les tables
graph = builder.compile(checkpointer=checkpointer)
</python> <typescript> Utilisez PostgresSaver au lieu de MemorySaver pour la persistence en production.
// INCORRECT : Données perdues au redémarrage du processus
const checkpointer = new MemorySaver(); // En mémoire uniquement !
// CORRECT : Utilisez le stockage persistant pour la production
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
const checkpointer = PostgresSaver.fromConnString("postgresql://...");
await checkpointer.setup(); // seulement nécessaire à la première utilisation pour créer les tables
</typescript> </fix-inmemory-not-for-production>
<fix-update-state-with-reducers> <python> Utilisez Overwrite pour remplacer les valeurs d'état au lieu de les transmettre via les réducteurs.
from langgraph.types import Overwrite
# État avec réducteur : items: Annotated[list, operator.add]
# État actuel : {"items": ["A", "B"]}
# update_state TRANSMET VIA les réducteurs
graph.update_state(config, {"items": ["C"]}) # Résultat : ["A", "B", "C"] - Ajouté !
# Pour REMPLACER à la place, utilisez Overwrite
graph.update_state(config, {"items": Overwrite(["C"])}) # Résultat : ["C"] - Remplacé
</python> <typescript> Utilisez Overwrite pour remplacer les valeurs d'état au lieu de les transmettre via les réducteurs.
import { Overwrite } from "@langchain/langgraph";
// État avec réducteur : items utilise le réducteur concat
// État actuel : { items: ["A", "B"] }
// updateState TRANSMET VIA les réducteurs
await graph.updateState(config, { items: ["C"] }); // Résultat : ["A", "B", "C"] - Ajouté !
// Pour REMPLACER à la place, utilisez Overwrite
await graph.updateState(config, { items: new Overwrite(["C"]) }); // Résultat : ["C"] - Remplacé
</typescript> </fix-update-state-with-reducers>
<fix-store-injection> <python> Accédez au store via l'objet Runtime dans les nœuds du graphe.
# INCORRECT : Store non disponible dans le nœud
def my_node(state):
store.put(...) # NameError ! store non défini
# CORRECT : Accédez au store via runtime
from langgraph.runtime import Runtime
def my_node(state, runtime: Runtime):
runtime.store.put(...) # Instance correcte du store
</python> <typescript> Accédez au store via le paramètre runtime dans les nœuds du graphe.
// INCORRECT : Store non disponible dans le nœud
const myNode = async (state) => {
store.put(...); // ReferenceError !
};
// CORRECT : Accédez au store via runtime
const myNode = async (state, runtime) => {
await runtime.store?.put(...); // Instance correcte du store
};
</typescript> </fix-store-injection>
<boundaries>
Ce que vous NE DEVEZ PAS Faire
- Utilisez
InMemorySaveren production — données perdues au redémarrage ; utilisezPostgresSaver - Oubliez
thread_id— l'état ne persiste pas sans - Attendez-vous à ce que
update_statecontourne les réducteurs — elle les transmet ; utilisezOverwritepour remplacer - Exécutez le même sous-graphe avec état (
checkpointer=True) en parallèle dans un seul nœud — conflit d'espace de noms - Accédez directement au store dans un nœud — utilisez
runtime.storevia le paramètreRuntime</boundaries>