langgraph-human-in-the-loop

Par langchain-ai · langchain-skills

Invoquez cette skill lors de l'implémentation de patterns human-in-the-loop, de pauses pour approbation, ou de gestion d'erreurs dans LangGraph. Couvre `interrupt()`, `Command(resume=...)`, les workflows d'approbation/validation, et la stratégie de gestion d'erreurs à 4 niveaux.

npx skills add https://github.com/langchain-ai/langchain-skills --skill langgraph-human-in-the-loop

<overview> Les patterns human-in-the-loop de LangGraph te permettent de mettre en pause l'exécution du graphe, d'exposer les données aux utilisateurs et de reprendre avec leur entrée :

  • interrupt(value) — met en pause l'exécution, expose une valeur à l'appelant
  • Command(resume=value) — reprend l'exécution, fournissant la valeur à interrupt()
  • Checkpointer — obligatoire pour sauvegarder l'état lors de la mise en pause
  • Thread ID — obligatoire pour identifier quelle exécution en pause reprendre </overview>

Conditions requises

Trois éléments sont nécessaires pour que les interruptions fonctionnent :

  1. Checkpointer — compiler avec checkpointer=InMemorySaver() (dev) ou PostgresSaver (prod)
  2. Thread ID — passer {"configurable": {"thread_id": "..."}} à chaque appel invoke/stream
  3. Payload sérialisable en JSON — la valeur passée à interrupt() doit être sérialisable en JSON

Interruption + Reprise basiques

interrupt(value) met en pause le graphe. La valeur apparaît dans le résultat sous __interrupt__. Command(resume=value) reprend — la valeur de reprise devient la valeur de retour de interrupt().

Critique : quand le graphe reprend, le nœud redémarre depuis le début — tout le code avant interrupt() s'exécute à nouveau.

<ex-basic-interrupt-resume> <python> Pause execution for human review and resume with Command.

from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class State(TypedDict):
    approved: bool

def approval_node(state: State):
    # Pause and ask for approval
    approved = interrupt("Do you approve this action?")
    # When resumed, Command(resume=...) returns that value here
    return {"approved": approved}

checkpointer = InMemorySaver()
graph = (
    StateGraph(State)
    .add_node("approval", approval_node)
    .add_edge(START, "approval")
    .add_edge("approval", END)
    .compile(checkpointer=checkpointer)
)

config = {"configurable": {"thread_id": "thread-1"}}

# Initial run — hits interrupt and pauses
result = graph.invoke({"approved": False}, config)
print(result["__interrupt__"])
# [Interrupt(value='Do you approve this action?')]

# Resume with the human's response
result = graph.invoke(Command(resume=True), config)
print(result["approved"])  # True

</python> <typescript> Pause execution for human review and resume with Command.

import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  approved: z.boolean().default(false),
});

const approvalNode = async (state: typeof State.State) => {
  // Pause and ask for approval
  const approved = interrupt("Do you approve this action?");
  // When resumed, Command({ resume }) returns that value here
  return { approved };
};

const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
  .addNode("approval", approvalNode)
  .addEdge(START, "approval")
  .addEdge("approval", END)
  .compile({ checkpointer });

const config = { configurable: { thread_id: "thread-1" } };

// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]

// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved);  // true

</typescript> </ex-basic-interrupt-resume>


Flux de travail d'approbation

Un pattern courant : interrompre pour afficher un brouillon, puis router en fonction de la décision de l'humain.

<ex-approval-workflow> <python> Interrupt for human review, then route to send or end based on the decision.

from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import Literal
from typing_extensions import TypedDict

class EmailAgentState(TypedDict):
    email_content: str
    draft_response: str
    classification: dict

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "__end__"]]:
    """Pause for human review using interrupt and route based on decision."""
    classification = state.get("classification", {})

    # interrupt() must come first — any code before it will re-run on resume
    human_decision = interrupt({
        "email_id": state.get("email_content", ""),
        "draft_response": state.get("draft_response", ""),
        "urgency": classification.get("urgency"),
        "action": "Please review and approve/edit this response"
    })

    # Process the human's decision
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
            goto="send_reply"
        )
    else:
        # Rejection — human will handle directly
        return Command(update={}, goto=END)

</python> <typescript> Interrupt for human review, then route to send or end based on the decision.

import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";

const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
  const classification = state.classification!;

  // interrupt() must come first — any code before it will re-run on resume
  const humanDecision = interrupt({
    emailId: state.emailContent,
    draftResponse: state.responseText,
    urgency: classification.urgency,
    action: "Please review and approve/edit this response",
  });

  // Process the human's decision
  if (humanDecision.approved) {
    return new Command({
      update: { responseText: humanDecision.editedResponse || state.responseText },
      goto: "sendReply",
    });
  } else {
    return new Command({ update: {}, goto: END });
  }
};

</typescript> </ex-approval-workflow>


Boucle de validation

Utilise interrupt() dans une boucle pour valider l'entrée humaine et re-demander si invalide.

<ex-validation-loop> <python> Validate human input in a loop, re-prompting until valid.

from langgraph.types import interrupt

def get_age_node(state):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)

        # Validate the input
        if isinstance(answer, int) and answer > 0:
            break
        else:
            # Invalid input — ask again with a more specific prompt
            prompt = f"'{answer}' is not a valid age. Please enter a positive number."

    return {"age": answer}

Each Command(resume=...) call provides the next answer. If invalid, the loop re-interrupts with a clearer message.

config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"

retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."

final = graph.invoke(Command(resume=30), config)
print(final["age"])  # 30

</python> <typescript> Validate human input in a loop, re-prompting until valid.

import { interrupt } from "@langchain/langgraph";

const getAgeNode = (state: typeof State.State) => {
  let prompt = "What is your age?";

  while (true) {
    const answer = interrupt(prompt);

    // Validate the input
    if (typeof answer === "number" && answer > 0) {
      return { age: answer };
    } else {
      // Invalid input — ask again with a more specific prompt
      prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
    }
  }
};

</typescript> </ex-validation-loop>


Interruptions multiples

Quand des branches parallèles appellent chacune interrupt(), reprends-les toutes en une seule invocation en mappant chaque ID d'interruption à sa valeur de reprise.

<ex-multiple-interrupts> <python> Resume multiple parallel interrupts by mapping interrupt IDs to values.

from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt

class State(TypedDict):
    vals: Annotated[list[str], operator.add]

def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}

def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}

graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] contains both Interrupt objects with IDs

# Resume all pending interrupts at once using a map of id -> value
resume_map = {
    i.id: f"answer for {i.value}"
    for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]

</python> <typescript> Resume multiple parallel interrupts by mapping interrupt IDs to values.

import { Command, END, MemorySaver, START, StateGraph, interrupt, isInterrupted, INTERRUPT, Annotation } from "@langchain/langgraph";

const State = Annotation.Root({
  vals: Annotation<string[]>({
    reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
    default: () => [],
  }),
});

function nodeA(_state: typeof State.State) {
  const answer = interrupt("question_a") as string;
  return { vals: [`a:${answer}`] };
}

function nodeB(_state: typeof State.State) {
  const answer = interrupt("question_b") as string;
  return { vals: [`b:${answer}`] };
}

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addEdge(START, "b")
  .addEdge("a", END)
  .addEdge("b", END)
  .compile({ checkpointer: new MemorySaver() });

const config = { configurable: { thread_id: "1" } };

const interruptedResult = await graph.invoke({ vals: [] }, config);

// Resume all pending interrupts at once
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
  for (const i of interruptedResult[INTERRUPT]) {
    if (i.id != null) {
      resumeMap[i.id] = `answer for ${i.value}`;
    }
  }
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]

</typescript> </ex-multiple-interrupts>

Les erreurs corrigeables par l'utilisateur utilisent interrupt() pour mettre en pause et collecter les données manquantes — c'est le pattern couvert par cette compétence. Pour la stratégie complète de gestion des erreurs à 4 niveaux (RetryPolicy, boucles d'erreur Command, etc.), voir la compétence fundamentals.


Les effets secondaires avant l'interruption doivent être idempotents

Quand le graphe reprend, le nœud redémarre depuis le début — TOUT le code avant interrupt() s'exécute à nouveau. Dans les subgraphes, le nœud parent ET le nœud subgraphe se réexécutent.

<idempotency-rules>

À faire :

  • Utiliser les opérations upsert (pas insert) avant interrupt()
  • Utiliser les patterns check-before-create
  • Placer les effets secondaires après interrupt() quand possible
  • Séparer les effets secondaires dans leurs propres nœuds

À ne pas faire :

  • Créer de nouveaux enregistrements avant interrupt() — crée des doublons à chaque reprise
  • Ajouter à des listes avant interrupt() — crée des entrées dupliquées à chaque reprise

</idempotency-rules>

<ex-idempotent-patterns> <python> Idempotent operations before interrupt vs non-idempotent (wrong).

# GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
    db.upsert_user(user_id=state["user_id"], status="pending_approval")
    approved = interrupt("Approve this change?")
    return {"approved": approved}

# GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
    approved = interrupt("Approve this change?")
    if approved:
        db.create_audit_log(user_id=state["user_id"], action="approved")
    return {"approved": approved}

# BAD: Insert creates duplicates on each resume!
def node_a(state: State):
    audit_id = db.create_audit_log({  # Runs again on resume!
        "user_id": state["user_id"],
        "action": "pending_approval",
    })
    approved = interrupt("Approve this change?")
    return {"approved": approved}

</python> <typescript> Idempotent operations before interrupt vs non-idempotent (wrong).

// GOOD: Upsert is idempotent — safe before interrupt
const nodeA = async (state: typeof State.State) => {
  await db.upsertUser({ userId: state.userId, status: "pending_approval" });
  const approved = interrupt("Approve this change?");
  return { approved };
};

// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
  const approved = interrupt("Approve this change?");
  if (approved) {
    await db.createAuditLog({ userId: state.userId, action: "approved" });
  }
  return { approved };
};

// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
  await db.createAuditLog({  // Runs again on resume!
    userId: state.userId,
    action: "pending_approval",
  });
  const approved = interrupt("Approve this change?");
  return { approved };
};

</typescript> </ex-idempotent-patterns>

<subgraph-interrupt-re-execution>

Réexécution du subgraphe à la reprise

Quand un subgraphe contient une interrupt(), reprendre réexécute TANT le nœud parent (qui a invoqué le subgraphe) QUE le nœud subgraphe (qui a appelé interrupt()) :

<python>

def node_in_parent_graph(state: State):
    some_code()  # <-- Re-executes on resume
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # <-- Also re-executes on resume
    result = interrupt("What's your name?")
    # ...

</python> <typescript>

async function nodeInParentGraph(state: State) {
  someCode();  // <-- Re-executes on resume
  const subgraphResult = await subgraph.invoke(someInput);
  // ...
}

async function nodeInSubgraph(state: State) {
  someOtherCode();  // <-- Also re-executes on resume
  const result = interrupt("What's your name?");
  // ...
}

</typescript> </subgraph-interrupt-re-execution>


Avertissement Command(resume)

Command(resume=...) est le seul pattern Command destiné comme entrée à invoke()/stream(). Ne passe PAS Command(update=...) en entrée — cela reprend depuis le dernier checkpoint et le graphe semble bloqué. Voir la compétence fundamentals pour l'explication complète de l'antipattern.


Corrections

<fix-checkpointer-required-for-interrupts> <python> Checkpointer required for interrupt functionality.

# WRONG
graph = builder.compile()

# CORRECT
graph = builder.compile(checkpointer=InMemorySaver())

</python> <typescript> Checkpointer required for interrupt functionality.

// WRONG
const graph = builder.compile();

// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });

</typescript> </fix-checkpointer-required-for-interrupts>

<fix-resume-with-command> <python> Use Command to resume from an interrupt (regular dict restarts graph).

# WRONG
graph.invoke({"resume_data": "approve"}, config)

# CORRECT
graph.invoke(Command(resume="approve"), config)

</python> <typescript> Use Command to resume from an interrupt (regular object restarts graph).

// WRONG
await graph.invoke({ resumeData: "approve" }, config);

// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);

</typescript> </fix-resume-with-command>

<boundaries>

Ce que tu ne devrais PAS faire

  • Utiliser les interruptions sans checkpointer — échouera
  • Reprendre sans le même thread_id — crée un nouveau thread au lieu de reprendre
  • Passer Command(update=...) en entrée d'invoke — le graphe semble bloqué (utilise un dict simple)
  • Effectuer des effets secondaires non-idempotents avant interrupt() — crée des doublons à la reprise
  • Supposer que le code avant interrupt() s'exécute une seule fois — il réexécute à chaque reprise </boundaries>

Skills similaires