<overview> Les patterns human-in-the-loop de LangGraph te permettent de mettre en pause l'exécution du graphe, d'exposer les données aux utilisateurs et de reprendre avec leur entrée :
interrupt(value)— met en pause l'exécution, expose une valeur à l'appelantCommand(resume=value)— reprend l'exécution, fournissant la valeur àinterrupt()- Checkpointer — obligatoire pour sauvegarder l'état lors de la mise en pause
- Thread ID — obligatoire pour identifier quelle exécution en pause reprendre </overview>
Conditions requises
Trois éléments sont nécessaires pour que les interruptions fonctionnent :
- Checkpointer — compiler avec
checkpointer=InMemorySaver()(dev) ouPostgresSaver(prod) - Thread ID — passer
{"configurable": {"thread_id": "..."}}à chaque appelinvoke/stream - Payload sérialisable en JSON — la valeur passée à
interrupt()doit être sérialisable en JSON
Interruption + Reprise basiques
interrupt(value) met en pause le graphe. La valeur apparaît dans le résultat sous __interrupt__. Command(resume=value) reprend — la valeur de reprise devient la valeur de retour de interrupt().
Critique : quand le graphe reprend, le nœud redémarre depuis le début — tout le code avant interrupt() s'exécute à nouveau.
<ex-basic-interrupt-resume> <python> Pause execution for human review and resume with Command.
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class State(TypedDict):
approved: bool
def approval_node(state: State):
# Pause and ask for approval
approved = interrupt("Do you approve this action?")
# When resumed, Command(resume=...) returns that value here
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
# Initial run — hits interrupt and pauses
result = graph.invoke({"approved": False}, config)
print(result["__interrupt__"])
# [Interrupt(value='Do you approve this action?')]
# Resume with the human's response
result = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
</python> <typescript> Pause execution for human review and resume with Command.
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// Pause and ask for approval
const approved = interrupt("Do you approve this action?");
// When resumed, Command({ resume }) returns that value here
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // true
</typescript> </ex-basic-interrupt-resume>
Flux de travail d'approbation
Un pattern courant : interrompre pour afficher un brouillon, puis router en fonction de la décision de l'humain.
<ex-approval-workflow> <python> Interrupt for human review, then route to send or end based on the decision.
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import Literal
from typing_extensions import TypedDict
class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "__end__"]]:
"""Pause for human review using interrupt and route based on decision."""
classification = state.get("classification", {})
# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "Please review and approve/edit this response"
})
# Process the human's decision
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# Rejection — human will handle directly
return Command(update={}, goto=END)
</python> <typescript> Interrupt for human review, then route to send or end based on the decision.
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() must come first — any code before it will re-run on resume
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "Please review and approve/edit this response",
});
// Process the human's decision
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};
</typescript> </ex-approval-workflow>
Boucle de validation
Utilise interrupt() dans une boucle pour valider l'entrée humaine et re-demander si invalide.
<ex-validation-loop> <python> Validate human input in a loop, re-prompting until valid.
from langgraph.types import interrupt
def get_age_node(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# Validate the input
if isinstance(answer, int) and answer > 0:
break
else:
# Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
Each Command(resume=...) call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
</python> <typescript> Validate human input in a loop, re-prompting until valid.
import { interrupt } from "@langchain/langgraph";
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
}
};
</typescript> </ex-validation-loop>
Interruptions multiples
Quand des branches parallèles appellent chacune interrupt(), reprends-les toutes en une seule invocation en mappant chaque ID d'interruption à sa valeur de reprise.
<ex-multiple-interrupts> <python> Resume multiple parallel interrupts by mapping interrupt IDs to values.
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
# Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] contains both Interrupt objects with IDs
# Resume all pending interrupts at once using a map of id -> value
resume_map = {
i.id: f"answer for {i.value}"
for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
</python> <typescript> Resume multiple parallel interrupts by mapping interrupt IDs to values.
import { Command, END, MemorySaver, START, StateGraph, interrupt, isInterrupted, INTERRUPT, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
vals: Annotation<string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});
function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [`a:${answer}`] };
}
function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [`b:${answer}`] };
}
const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// Resume all pending interrupts at once
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = `answer for ${i.value}`;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]
</typescript> </ex-multiple-interrupts>
Les erreurs corrigeables par l'utilisateur utilisent interrupt() pour mettre en pause et collecter les données manquantes — c'est le pattern couvert par cette compétence. Pour la stratégie complète de gestion des erreurs à 4 niveaux (RetryPolicy, boucles d'erreur Command, etc.), voir la compétence fundamentals.
Les effets secondaires avant l'interruption doivent être idempotents
Quand le graphe reprend, le nœud redémarre depuis le début — TOUT le code avant interrupt() s'exécute à nouveau. Dans les subgraphes, le nœud parent ET le nœud subgraphe se réexécutent.
<idempotency-rules>
À faire :
- Utiliser les opérations upsert (pas insert) avant
interrupt() - Utiliser les patterns check-before-create
- Placer les effets secondaires après
interrupt()quand possible - Séparer les effets secondaires dans leurs propres nœuds
À ne pas faire :
- Créer de nouveaux enregistrements avant
interrupt()— crée des doublons à chaque reprise - Ajouter à des listes avant
interrupt()— crée des entrées dupliquées à chaque reprise
</idempotency-rules>
<ex-idempotent-patterns> <python> Idempotent operations before interrupt vs non-idempotent (wrong).
# GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
# GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
# BAD: Insert creates duplicates on each resume!
def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
</python> <typescript> Idempotent operations before interrupt vs non-idempotent (wrong).
// GOOD: Upsert is idempotent — safe before interrupt
const nodeA = async (state: typeof State.State) => {
await db.upsertUser({ userId: state.userId, status: "pending_approval" });
const approved = interrupt("Approve this change?");
return { approved };
};
// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};
// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};
</typescript> </ex-idempotent-patterns>
<subgraph-interrupt-re-execution>
Réexécution du subgraphe à la reprise
Quand un subgraphe contient une interrupt(), reprendre réexécute TANT le nœud parent (qui a invoqué le subgraphe) QUE le nœud subgraphe (qui a appelé interrupt()) :
<python>
def node_in_parent_graph(state: State):
some_code() # <-- Re-executes on resume
subgraph_result = subgraph.invoke(some_input)
# ...
def node_in_subgraph(state: State):
some_other_code() # <-- Also re-executes on resume
result = interrupt("What's your name?")
# ...
</python> <typescript>
async function nodeInParentGraph(state: State) {
someCode(); // <-- Re-executes on resume
const subgraphResult = await subgraph.invoke(someInput);
// ...
}
async function nodeInSubgraph(state: State) {
someOtherCode(); // <-- Also re-executes on resume
const result = interrupt("What's your name?");
// ...
}
</typescript> </subgraph-interrupt-re-execution>
Avertissement Command(resume)
Command(resume=...) est le seul pattern Command destiné comme entrée à invoke()/stream(). Ne passe PAS Command(update=...) en entrée — cela reprend depuis le dernier checkpoint et le graphe semble bloqué. Voir la compétence fundamentals pour l'explication complète de l'antipattern.
Corrections
<fix-checkpointer-required-for-interrupts> <python> Checkpointer required for interrupt functionality.
# WRONG
graph = builder.compile()
# CORRECT
graph = builder.compile(checkpointer=InMemorySaver())
</python> <typescript> Checkpointer required for interrupt functionality.
// WRONG
const graph = builder.compile();
// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });
</typescript> </fix-checkpointer-required-for-interrupts>
<fix-resume-with-command> <python> Use Command to resume from an interrupt (regular dict restarts graph).
# WRONG
graph.invoke({"resume_data": "approve"}, config)
# CORRECT
graph.invoke(Command(resume="approve"), config)
</python> <typescript> Use Command to resume from an interrupt (regular object restarts graph).
// WRONG
await graph.invoke({ resumeData: "approve" }, config);
// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);
</typescript> </fix-resume-with-command>
<boundaries>
Ce que tu ne devrais PAS faire
- Utiliser les interruptions sans checkpointer — échouera
- Reprendre sans le même thread_id — crée un nouveau thread au lieu de reprendre
- Passer
Command(update=...)en entrée d'invoke — le graphe semble bloqué (utilise un dict simple) - Effectuer des effets secondaires non-idempotents avant
interrupt()— crée des doublons à la reprise - Supposer que le code avant
interrupt()s'exécute une seule fois — il réexécute à chaque reprise </boundaries>