AI Backend 7 min read

LangGraph Checkpointer: Automatic State Persistence with PostgreSQL

Hoang Dang Tan Phat (Kane)

Hoang Dang Tan Phat (Kane)

Feb 22, 2026

In Part 1, we built a LangGraph agent with intent routing. In Part 2, we added manual history loading from PostgreSQL. This post introduces LangGraph’s built-in checkpointer — automatic state persistence that eliminates manual history management.

What is a Checkpointer?

A checkpointer saves the entire graph state after each node execution. On subsequent requests with the same thread_id, LangGraph automatically restores the previous state — including conversation history, intermediate results, and any custom state fields.

Benefits over manual history loading:

  • Zero boilerplate: No need to query/save history manually
  • Full state recovery: Restores all state fields, not just messages
  • Automatic versioning: Built-in checkpoint history with parent references
  • Interrupt/resume: Pause and resume workflows across requests

Architecture with Checkpointer

flowchart LR
    A[API Request] --> B{Checkpointer}
    B --> |thread_id exists| C[Restore State]
    B --> |new thread| D[Fresh State]
    C --> E[Graph Execution]
    D --> E
    E --> F[Save Checkpoint]
    F --> G[(PostgreSQL)]

    style B fill:#4f46e5,color:#fff
    style G fill:#059669,color:#fff

The checkpointer intercepts every invocation. If a checkpoint exists for the thread_id, state is restored automatically. After execution, the new state is persisted.

Setup PostgresSaver

Install the checkpoint package:

uv add langgraph-checkpoint-postgres

Create an async context manager for the checkpointer:

# agent/graph.py
import os
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

DATABASE_URL = os.getenv("DATABASE_URL", "")


def build_graph():
    graph = StateGraph(AgentState)
    # ... add nodes and edges
    return graph


def get_checkpointer_context():
    """Get async context manager for PostgresSaver checkpointer."""
    if not DATABASE_URL:
        raise ValueError("DATABASE_URL environment variable is required")
    return AsyncPostgresSaver.from_conn_string(DATABASE_URL)


def compile_with_checkpointer(checkpointer: AsyncPostgresSaver):
    """Compile graph with the given checkpointer."""
    graph = build_graph()
    return graph.compile(checkpointer=checkpointer)

FastAPI Lifespan Integration

Initialize the checkpointer in FastAPI’s lifespan context:

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from agent import get_checkpointer_context, compile_with_checkpointer

agent = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage app lifecycle: startup and shutdown."""
    global agent
    async with get_checkpointer_context() as checkpointer:
        await checkpointer.setup()  # Creates tables if not exist
        agent = compile_with_checkpointer(checkpointer)
        yield


app = FastAPI(title="AI Chat Service", lifespan=lifespan)

The checkpointer.setup() creates the required tables automatically. On shutdown, the async context manager closes connections cleanly.

Thread ID Configuration

Pass thread_id in the config to isolate conversations:

@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest):
    # Thread config - uses conversationId as thread_id
    config = {"configurable": {"thread_id": req.conversationId}}

    result = await agent.ainvoke(
        {
            "message": req.message,
            "conversation_id": req.conversationId,
            "user_id": req.userId,
            "needs_human": False,
        },
        config,  # Pass config as second argument
    )
    return ChatResponse(
        reply=result.get("reply"),
        needs_human=result.get("needs_human", False),
        confidence=result.get("confidence", 0.0),
    )

Each unique thread_id gets its own checkpoint chain. The conversationId is a natural choice — each conversation maintains its own state.

Checkpoint Tables Schema

PostgresSaver creates three tables:

-- Main checkpoint storage
CREATE TABLE checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    parent_checkpoint_id TEXT,
    type TEXT,
    checkpoint JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

-- Binary blob storage for large state values
CREATE TABLE checkpoint_blobs (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    channel TEXT NOT NULL,
    version TEXT NOT NULL,
    type TEXT NOT NULL,
    blob BYTEA,
    PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);

-- Write operations for durability
CREATE TABLE checkpoint_writes (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id TEXT NOT NULL,
    idx INTEGER NOT NULL,
    channel TEXT NOT NULL,
    type TEXT,
    blob BYTEA NOT NULL,
    task_path TEXT NOT NULL DEFAULT '',
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

With Prisma, define these as models:

model LangGraphCheckpoint {
  threadId           String  @map("thread_id") @db.Text
  checkpointNs       String  @default("") @map("checkpoint_ns") @db.Text
  checkpointId       String  @map("checkpoint_id") @db.Text
  parentCheckpointId String? @map("parent_checkpoint_id") @db.Text
  type               String? @db.Text
  checkpoint         Json
  metadata           Json    @default("{}")

  @@id([threadId, checkpointNs, checkpointId])
  @@map("checkpoints")
}

model LangGraphCheckpointBlob {
  threadId     String @map("thread_id") @db.Text
  checkpointNs String @default("") @map("checkpoint_ns") @db.Text
  channel      String @db.Text
  version      String @db.Text
  type         String @db.Text
  blob         Bytes?

  @@id([threadId, checkpointNs, channel, version])
  @@map("checkpoint_blobs")
}

model LangGraphCheckpointWrite {
  threadId     String  @map("thread_id") @db.Text
  checkpointNs String  @default("") @map("checkpoint_ns") @db.Text
  checkpointId String  @map("checkpoint_id") @db.Text
  taskId       String  @map("task_id") @db.Text
  idx          Int
  channel      String  @db.Text
  type         String? @db.Text
  blob         Bytes
  taskPath     String  @default("") @map("task_path") @db.Text

  @@id([threadId, checkpointNs, checkpointId, taskId, idx])
  @@map("checkpoint_writes")
}

Hybrid History Loading

With checkpointer, the history_loader node becomes simpler — check if state was restored, otherwise fall back to DB:

# agent/nodes/history_loader.py
from agent.state import AgentState
from db import engine
from sqlalchemy import text


async def history_loader_node(state: AgentState) -> dict:
    """Load history from DB if not restored from checkpoint."""
    existing_history = state.get("conversation_history") or []
    current_message = state.get("message", "")

    # If history exists from checkpoint, just append current message
    if existing_history:
        if current_message:
            updated = existing_history + [{"role": "user", "content": current_message}]
            return {"conversation_history": updated}
        return {}

    # First message or no checkpoint - load from Messages table
    conversation_id = state.get("conversation_id")
    if not conversation_id:
        return {"conversation_history": []}

    query = text("""
        SELECT role, content
        FROM messages
        WHERE conversation_id = :conv_id
        ORDER BY created_at DESC
        LIMIT 10
    """)

    async with engine.connect() as conn:
        result = await conn.execute(query, {"conv_id": conversation_id})
        rows = result.fetchall()

    # Reverse to chronological, add current message
    history = [{"role": row.role.lower(), "content": row.content} for row in reversed(rows)]
    if current_message:
        history.append({"role": "user", "content": current_message})

    return {"conversation_history": history}

This hybrid approach:

  1. Checkpoint exists: Use restored state, append current message
  2. No checkpoint: Query Messages table (first message or checkpoint cleared)

Checkpointer vs Manual History

AspectManual HistoryCheckpointer
State savedOnly messagesEntire graph state
Code neededQuery + save logicJust config param
VersioningManualBuilt-in with parents
RecoveryLatest onlyAny checkpoint
PerformanceOptimized queriesAutomatic caching

Use checkpointer when:

  • You need full state recovery (not just messages)
  • You want interrupt/resume capability
  • You’re building multi-step workflows with intermediate state

Use manual history when:

  • You only need conversation messages
  • You have existing message storage (e.g., chat app DB)
  • You need custom retention policies

Conclusion

LangGraph’s PostgresSaver checkpointer automates state persistence:

  • Setup once: Context manager + compile(checkpointer=...)
  • Thread isolation: Pass thread_id in config
  • Automatic tables: setup() creates schema
  • Hybrid loading: Checkpoint + DB fallback

The checkpointer handles the boring parts — you focus on graph logic. State persists automatically across requests, enabling stateful conversations without manual history management.

Resources

langgraph langchain python postgresql checkpointer state-persistence
Hoang Dang Tan Phat (Kane)

Hoang Dang Tan Phat (Kane)

Full-stack developer with 8+ years experience. Building scalable systems with Go, TypeScript, and React.