Building Production Agent APIs with FastAPI: Implementation Guide

Author
Fei Wang
Published
Affiliations
The AI Engineer

Transform agent architecture concepts into working production code. This implementation guide builds the complete MessageManager class with Redis L1 caching and PostgreSQL L2 persistence, implements session management with state tracking, creates streaming response handlers using Server-Sent Events, shows how interactive tools pause and resume across requests, and designs the frontend integration layer. Each pattern is demonstrated with real code that handles production traffic, error recovery, and scalability concerns.

This article implements the architectural patterns from The Anatomy of a Production Agent. We’ll build production-ready code for the MessageManager abstraction, covering Redis/PostgreSQL integration, token budget queries, streaming responses, and interactive tool state management. The following diagram shows the complete system architecture:

Production Agent System Architecture Overview Production Agent System Architecture Overview

MessageManager: The State Management Layer

MessageManager hides the complexity of persistence, caching, and loop state management—keeping your agent code clean and focused on logic. It handles the messy details: Redis/PostgreSQL dual-layer caching, token budget queries, sequence number tracking, context injection, and loop state persistence for interactive tools.

MessageManager handles persistence, injections, and context engineering. The storage abstraction (HybridStorage) manages Redis L1 + PostgreSQL L2 caching behind the scenes.

# Define injection functions (system context added before loading history)
async def inject_current_time():
    return {
        "role": "user",
        "content": f"[System] Current time: {datetime.now().isoformat()}",
        "sequence": 0  # Injections get sequence 0
    }

# Usage in FastAPI endpoint
@app.post("/chat/{session_id}")
async def stream_chat(session_id: str, request: ChatMessageRequest):
    # Context manager handles lifecycle
    async with MessageManager(
        session_id=session_id,
        user_id="demo_user",
        storage=storage,  # HybridStorage singleton
        injections=[inject_current_time]
    ) as msg_mgr:
        # Load history (injections applied first, then DB messages)
        await msg_mgr.load(token_budget=180000)

        # Add user message (auto-saved to storage)
        await msg_mgr.add_user_message(request.message)

        # Run loop - MessageManager handles persistence
        async for event in run_agent_loop(
            message_manager=msg_mgr,  # Pass instance, not messages
            tools=TOOLS,
            tool_executors=EXECUTORS
        ):
            yield f"data: {json.dumps(event)}\n\n"

MessageManager class interface - handles persistence, injections, and context engineering.

class MessageManager:
    """Conversation state management with smart loading and auto-persistence"""

    def __init__(self, session_id, user_id, storage, injections=None):
        self.session_id = session_id
        self.user_id = user_id
        self.storage = storage  # HybridStorage (Redis + PostgreSQL)
        self.injections = injections or []
        self._messages = []  # In-memory working set

    async def load(self, token_budget=180000):
        """Load messages within token budget

        Order: injections → load from storage → trim to budget
        """

    async def add_user_message(self, content):
        """Add user message, estimate tokens, auto-save to storage"""

    async def add_assistant_message(self, content_blocks, usage):
        """Add assistant message, use accurate tokens from API, auto-save"""

    def to_api_format(self, keep_full_detail_for_last_n=30):
        """Convert to API format, strip metadata, apply context engineering

        Old interactions: Strip tool_use/tool_result/thinking blocks
        Recent interactions: Keep full detail
        """

    def get_for_display(self):
        """Filter for frontend - show user msgs + assistant text only"""

    async def save_loop_state(self, step, awaiting_tool, tool_use_id):
        """Save loop state for interactive tool pause"""

    async def load_loop_state(self):
        """Load saved state for resume"""

    async def clear_loop_state(self):
        """Clear state after resumption"""

    # Context manager support
    async def __aenter__(self): return self
    async def __aexit__(self, *args): pass

Token tracking: User messages use estimation (1 token ≈ 4 chars), assistant messages use accurate count from API response. Both auto-save to storage.

async def add_user_message(self, content):
    """Add user message with estimated tokens, auto-save"""
    # Estimate: 1 token ≈ 4 chars (acceptable for budget queries)
    estimated_tokens = len(content) // 4

    msg = {
        "sequence": self._get_next_sequence(),
        "role": "user",
        "content": content,  # str or list (tool results)
        "tokens": estimated_tokens,
        "tokens_estimated": True,
        "created_at": datetime.now().isoformat()
    }

    self._messages.append(msg)
    await self.storage.save_message(self.session_id, self.user_id, msg)


async def add_assistant_message(self, content_blocks, usage):
    """Add assistant message with accurate tokens from API, auto-save"""
    # Use ACCURATE tokens from API response
    accurate_tokens = usage["output_tokens"]

    # Handle Pydantic models from Anthropic SDK
    if content_blocks and hasattr(content_blocks[0], 'model_dump'):
        content_blocks = [block.model_dump() for block in content_blocks]

    msg = {
        "sequence": self._get_next_sequence(),
        "role": "assistant",
        "content": content_blocks,  # Keep thinking blocks
        "tokens": accurate_tokens,
        "tokens_estimated": False,
        "created_at": datetime.now().isoformat()
    }

    self._messages.append(msg)
    await self.storage.save_message(self.session_id, self.user_id, msg)


def _get_next_sequence(self):
    """Get next sequence number"""
    if not self._messages:
        return 1
    return self._messages[-1]["sequence"] + 1

Why this works: Estimation for user input is fast and accurate enough (±10-20% error acceptable). API provides exact tokens for assistant output. Avoids complex reconciliation.

Three key operations: load from storage with budget, transform to API format with context engineering, filter for frontend display.

async def load(self, token_budget=180000):
    """Load messages within token budget

    Order: Execute injections → Load from storage → Trim to budget
    """
    # Step 1: Apply injections (system context)
    for inject_fn in self.injections:
        injected_msg = await inject_fn()
        self._messages.append(injected_msg)

    # Step 2: Load from storage (Redis first, PostgreSQL fallback)
    history = await self.storage.load_recent_messages(
        session_id=self.session_id,
        limit=100
    )

    # Step 3: Trim to token budget
    total_tokens = sum(m["tokens"] for m in self._messages)
    for msg in history:
        if total_tokens + msg["tokens"] > token_budget:
            break
        self._messages.append(msg)
        total_tokens += msg["tokens"]

    return self


def to_api_format(self, keep_full_detail_for_last_n=30):
    """Transform to API format with progressive detail reduction

    Context engineering:
    - Old interactions: Strip tool_use/tool_result/thinking (keep text only)
    - Recent interactions: Keep full detail for reasoning continuity
    """
    # Group into interactions (user input → assistant → tool results)
    interactions = self._group_into_interactions()

    if len(interactions) <= keep_full_detail_for_last_n:
        # Few interactions - keep everything
        return self._messages_to_api_format(self._messages)

    # Split: old interactions vs recent
    old_count = len(interactions) - keep_full_detail_for_last_n
    old = interactions[:old_count]
    recent = interactions[old_count:]

    # Strip old interactions (remove tool blocks)
    stripped = []
    for interaction in old:
        for msg in interaction:
            # Keep only text blocks from assistant messages
            # Remove tool_result user messages entirely
            stripped_msg = self._strip_tool_blocks(msg)
            if stripped_msg:
                stripped.append(stripped_msg)

    # Combine: stripped old + full recent
    all_messages = stripped + [msg for interaction in recent for msg in interaction]

    # Convert to API format (strip metadata, clean extra fields)
    return self._messages_to_api_format(all_messages)


def _messages_to_api_format(self, messages):
    """Strip metadata, remove extra fields from tool_result blocks"""
    api_messages = []
    for msg in messages:
        api_msg = {"role": msg["role"], "content": msg["content"]}

        # Clean tool_result blocks (remove display_data, artifact_data, etc.)
        if isinstance(api_msg["content"], list):
            cleaned_content = []
            for block in api_msg["content"]:
                if block.get("type") == "tool_result":
                    # Only allowed fields: type, tool_use_id, content, is_error
                    cleaned_block = {
                        "type": "tool_result",
                        "tool_use_id": block["tool_use_id"],
                        "content": block["content"]
                    }
                    if "is_error" in block:
                        cleaned_block["is_error"] = block["is_error"]
                    cleaned_content.append(cleaned_block)
                else:
                    cleaned_content.append(block)
            api_msg["content"] = cleaned_content

        api_messages.append(api_msg)
    return api_messages

Key insight: Tool blocks (tool_use, tool_result) consume tokens but add little value after the first few interactions. Stripping them from old messages saves 50-70% tokens while preserving conversation flow.

HybridStorage: L1/L2 Cache Architecture

HybridStorage implements two-tier caching: Redis (L1) for fast access with 1-hour TTL, PostgreSQL (L2) for durable persistence. The load pattern tries Redis first, falls back to PostgreSQL on cache miss, then warms the cache. The save pattern invalidates cache after PostgreSQL writes, ensuring consistency.

Two storage layers: PostgreSQL (L2) for durable persistence with indexed queries, Redis (L1) for fast caching with JSON serialization.

PostgreSQL Schema:

class Message(Base):
    """Message storage with token tracking for budget queries"""
    __tablename__ = "messages"

    # Identifiers
    message_id: Mapped[str] = mapped_column(String(36), unique=True, index=True)
    session_id: Mapped[str] = mapped_column(String(36), index=True)
    user_id: Mapped[str] = mapped_column(String(255), index=True)

    # Message data
    sequence: Mapped[int] = mapped_column(Integer)  # Order within session
    role: Mapped[str] = mapped_column(String(20))  # 'user' | 'assistant'
    content: Mapped[dict] = mapped_column(JSON)  # Flexible structure

    # Token tracking
    tokens: Mapped[int] = mapped_column(Integer)
    tokens_estimated: Mapped[bool] = mapped_column(Boolean, default=True)

    __table_args__ = (
        # Composite index for token budget queries
        Index('idx_session_sequence_tokens', 'session_id', 'sequence', 'tokens'),
        # Index for recent message queries
        Index('idx_session_created', 'session_id', 'created_at'),
        # Unique constraint per session
        UniqueConstraint('session_id', 'sequence')
    )

Redis Structure:

# Key: {session_id}:messages
# TTL: 3600 seconds (1 hour)
# Format: JSON with messages list

{
    "messages": [
        {"sequence": 1, "role": "user", "content": "...", "tokens": 150},
        {"sequence": 2, "role": "assistant", "content": [...], "tokens": 450}
    ],
    "total_tokens": 600,
    "last_sequence": 2
}

# Operations
await redis.get(f"{session_id}:messages")  # Load cache
await redis.set(f"{session_id}:messages", data, ex=3600)  # Cache with TTL
await redis.delete(f"{session_id}:messages")  # Invalidate

Why this works: Composite index enables token budget queries without full table scan. JSON column stores flexible content (text, tool blocks, artifacts). Redis caches recent 100 messages for instant access.

Cache-first pattern: Try Redis (L1) → Fallback to PostgreSQL (L2) → Warm cache. Achieves >95% cache hit rate in production.

async def load_recent_messages(self, session_id: str, limit=100) -> list[dict]:
    """Load recent messages: Redis first, PostgreSQL fallback"""

    # Step 1: Try Redis cache (L1) - sub-millisecond
    cached_data = await self.redis.get_messages(session_id)

    if cached_data:
        # Cache hit
        messages = cached_data["messages"]
        if len(messages) > limit:
            messages = messages[-limit:]  # Trim to limit
        return messages

    # Step 2: Cache miss - load from PostgreSQL (L2) - 5-20ms
    messages = await self.postgres.load_recent_messages(session_id, limit)

    # Step 3: Warm Redis cache for next request
    if messages:
        cache_data = {
            "messages": messages,
            "total_tokens": sum(m["tokens"] for m in messages),
            "last_sequence": messages[-1]["sequence"]
        }
        await self.redis.set_messages(session_id, cache_data)

    return messages

Why this works: Redis stores recent messages (last 100) for instant access. PostgreSQL provides durable fallback and full history. Cache hit rate >95% means most requests never query database.

Write-through with invalidation: PostgreSQL first (durable) → Invalidate Redis cache. Simple and prevents stale data.

async def save_message(self, session_id: str, user_id: str, message: dict):
    """Save to both layers: PostgreSQL first, then invalidate Redis cache"""

    # Step 1: Save to PostgreSQL (L2) - durable layer
    try:
        await self.postgres.save_message(session_id, user_id, message)
    except Exception as e:
        # PostgreSQL failure is critical - don't update cache
        logger.error(f"PostgreSQL save failed: {e}")
        raise  # Fail-fast

    # Step 2: Invalidate Redis cache (L1)
    try:
        # Simple invalidation - next load will rebuild from PostgreSQL
        await self.redis.invalidate_messages(session_id)
    except Exception as e:
        # Redis failure doesn't block - degraded performance only
        logger.warning(f"Redis cache invalidation failed: {e}")
        # Continue - PostgreSQL save succeeded

Why this works: Invalidation is simpler than cache append and prevents stale data. Next load rebuilds cache from PostgreSQL. PostgreSQL failure blocks (fail-fast for data consistency). Redis failure logged but non-blocking (graceful degradation).

Database setup includes initialization, module organization, and CRUD pattern for storage layer.

Database Setup (app/store/db.py):

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

# Create async engine
async_engine = create_async_engine(
    settings.POSTGRESQL_URI,
    poolclass=NullPool,  # Stateless for async
    echo=False
)

# Create session maker
async_session_maker = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

async def create_all_tables():
    """Create all tables from ORM models"""
    async with async_engine.begin() as conn:
        # Create schema
        await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
        # Create tables
        await conn.run_sync(Base.metadata.create_all)

Module Organization:

app/
├── storage/                  # Storage abstractions
│   ├── hybrid_storage.py    # L1 (Redis) + L2 (PostgreSQL)
│   ├── redis_storage.py     # Cache operations
│   └── postgresql_storage.py # Durable persistence
├── store/                    # Database layer
│   ├── db.py                # Engine, session maker, init
│   └── crud_base.py         # Generic CRUD operations
└── models/                   # ORM models
    ├── orm_base.py          # SQLAlchemy Base
    ├── message.py           # Message model
    └── session.py           # UserSession model

CRUD Pattern (app/store/crud_base.py):

class CRUDBase(Generic[ModelType, CreateSchema, UpdateSchema]):
    """Generic CRUD operations for any model"""

    def __init__(self, model: Type[ModelType]):
        self._model = model

    async def create(self, db: AsyncSession, obj_in: CreateSchema):
        """Create new record"""
        db_obj = self._model(**obj_in.model_dump())
        db.add(db_obj)
        await db.commit()
        return db_obj

    async def get(self, db: AsyncSession, **filters):
        """Get single record by filters"""
        stmt = select(self._model).filter_by(**filters)
        result = await db.execute(stmt)
        return result.scalars().first()

    async def get_multi(self, db: AsyncSession, limit=100):
        """Get multiple records with pagination"""
        # ...

# Usage
from app.models.message import Message
crud_message = CRUDBase[Message, CreateMessage, UpdateMessage](Message)

async with async_session_maker() as db:
    message = await crud_message.create(db, obj_in=data)

Why this works: Separation of concerns - storage/ handles caching strategy, store/ handles database operations, models/ defines schema. CRUD base provides type-safe operations for all models.

FastAPI Application Structure

FastAPI provides the application structure: lifespan management for storage initialization, route organization for chat endpoints, and streaming responses via Server-Sent Events. This section shows how MessageManager and HybridStorage integrate into a working API.

Production FastAPI apps use layered architecture: routes delegate to services, services use CRUD/storage, storage interacts with databases. This separation keeps concerns isolated and code maintainable as the system scales.

app/
├── main.py              # Application entry, lifespan, CORS
├── api/                 # Route handlers (thin layer)
│   ├── __init__.py
│   ├── chat.py         # Chat streaming endpoints
│   └── sessions.py     # Session management
├── services/            # Business logic
│   ├── message_manager.py
│   └── agent_loop.py
├── storage/             # Data access layer
│   ├── hybrid_storage.py
│   ├── redis_storage.py
│   └── postgresql_storage.py
├── models/              # SQLAlchemy ORM models
│   ├── orm_base.py
│   └── message.py
├── schemas/             # Pydantic request/response models
│   ├── chat.py
│   └── session.py
├── auth/                # Authentication
│   └── jwt.py
├── core/                # Configuration
│   └── config.py
└── tools/               # Agent tools (optional)
    └── ...

Layered flow: Request → Route (validation) → Service (business logic) → Storage (CRUD) → Database. Each layer has single responsibility. Routes are thin (just validation and delegation). Services contain logic (MessageManager, agent loop). Storage handles persistence (HybridStorage). This organization scales to hundreds of endpoints without becoming unmaintainable.

Lifespan pattern manages resources: validate → initialize → connect → cleanup. Ensures proper initialization order and guaranteed cleanup.

# app/main.py

storage = HybridStorage()  # Global singleton

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Startup: validate → create tables → connect storage"""

    # Validate API key (fail fast)
    if not settings.ANTHROPIC_API_KEY:
        raise ValueError("ANTHROPIC_API_KEY required")

    # Create database tables
    await create_all_tables()
    logger.info("✓ Database initialized")

    # Connect storage (Redis + PostgreSQL)
    await storage.connect(settings.get_redis_url())
    logger.info("✓ Storage connected")

    yield  # App runs

    # Shutdown: cleanup
    await storage.disconnect()
    logger.info("✓ Shutdown complete")


# Create app
app = FastAPI(title="Agent API", lifespan=lifespan)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

# Include routers
app.include_router(chat_router)

Why this order? Validate first (fail fast), then create tables, then connect to external services. Lifespan ensures cleanup even on errors.

Routes use APIRouter for modularity. Current implementation has no auth (future enhancement). Auto-creates sessions if they don’t exist.

# app/api/chat.py

router = APIRouter(prefix="/api/v1", tags=["chat"])

@router.post("/chat/{session_id}")
async def stream_chat(session_id: str, request: ChatMessageRequest):
    """Stream chat using SSE - auto-creates session if doesn't exist"""

    # Ensure session exists (auto-create)
    async with async_session_maker() as db:
        result = await db.execute(
            select(UserSession).where(UserSession.session_id == session_id)
        )
        session = result.scalar_one_or_none()

        if not session:
            # Auto-create
            new_session = UserSession(
                session_id=session_id,
                user_id="demo_user",  # TODO: Add auth
                created_at=datetime.now()
            )
            db.add(new_session)
            await db.commit()

    # Event generator for SSE
    async def event_stream():
        async with MessageManager(
            session_id=session_id,
            user_id="demo_user",
            storage=storage,
            injections=[inject_current_time]
        ) as msg_mgr:
            await msg_mgr.load(token_budget=180000)
            await msg_mgr.add_user_message(request.message)

            # Stream agent loop (MessageManager handles saves)
            async for event in run_agent_loop(
                message_manager=msg_mgr,
                tools=TEST_TOOLS,
                tool_executors=TEST_EXECUTORS
            ):
                yield f"data: {json.dumps(event)}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")


@router.get("/chats")
async def list_chats(limit: int = 50):
    """List user's chats"""
    # ... query sessions and return

Simple pattern: Auto-create session, use MessageManager with storage, stream events. Auth planned for future. Agent loop saves messages automatically via MessageManager.

Interactive tool resumption, background title generation, and event types (display, artifact, interactive).

# app/api/chat.py

@router.post("/chat/{session_id}")
async def stream_chat(session_id: str, request: ChatMessageRequest):
    """Stream chat - handles new messages and interactive tool resumption"""

    async def event_stream():
        async with MessageManager(...) as msg_mgr:
            # Check if resuming from interactive tool
            loop_state = await msg_mgr.load_loop_state()

            if loop_state:
                # RESUME: User responded to interactive tool
                await msg_mgr.load()
                await msg_mgr.add_user_message([{
                    "type": "tool_result",
                    "tool_use_id": loop_state["tool_use_id"],
                    "content": f"User selected: {request.message}"
                }])
                await msg_mgr.clear_loop_state()
            else:
                # NEW MESSAGE
                await msg_mgr.load()
                await msg_mgr.add_user_message(request.message)

            # Run loop - agent_loop handles saves via MessageManager
            async for event in run_agent_loop(message_manager=msg_mgr, ...):
                yield f"data: {json.dumps(event)}\n\n"

                # Event types: display, artifact, awaiting_user_input, complete

                if event["type"] == "complete":
                    # Trigger title generation after first interaction
                    await maybe_generate_title(session_id, msg_mgr)

    return StreamingResponse(event_stream(), media_type="text/event-stream")


@router.get("/chats")
async def list_chats(limit=50):
    """List sessions with AI-generated titles"""
    sessions = await db.query(UserSession).limit(limit).all()
    return [
        {
            "session_id": s.session_id,
            "title": s.ai_generated_title or "New Chat",
            "message_count": s.message_count
        }
        for s in sessions
    ]


# Helper: Background title generation
async def maybe_generate_title(session_id, msg_mgr):
    """Generate title if session doesn't have one yet"""
    # Check if first interaction
    user_msgs = [m for m in msg_mgr._messages if _is_user_input(m)]
    assistant_msgs = [m for m in msg_mgr._messages if m["role"] == "assistant"]

    if user_msgs and assistant_msgs:
        # Trigger background task
        asyncio.create_task(
            generate_session_title(session_id, user_msgs[0], assistant_msgs[0])
        )

Key patterns: Check loop_state for resumption, yield events for SSE streaming, background title generation after first interaction. Agent loop saves via MessageManager automatically.

Advanced Tool Patterns: Display, User Interaction, and Artifacts

Production agents use three tool patterns: display tools (rich inline data), interactive tools (pause for user input), and artifact tools (separate panel rendering for complex content). Each pattern requires different handling for tool results, state management, and frontend integration.

Tool functions return varied data structures, but only specific fields are allowed when sending to Claude’s API. This creates a design challenge: preserve extra fields for routing logic while stripping them before API calls.

Level 4: Claude API

Receives: Clean tool_result blocks only

Allowed fields: type, tool_use_id, content, is_error

❌ Extra fields (display_data, artifact_data, requires_user_input) cause API errors

All tools: tool_use block has input, tool_result block has output

Tool Result Content by Type:

Regular Tool
{
  "type": "tool_result",
  "content": "Result: 42"
}

Actual result from tool

Display Tool
{
  "type": "tool_result",
  "content": "Analysis displayed"
}

Confirmation only (display_data stripped)

Interactive Tool
{
  "type": "tool_result",
  "content": "User selected: C++"
}

User's choice (frontend provides selection)

Level 3: Agent Loop

Receives: tool_result with extra fields preserved

Uses extra fields for control flow:

  • display_data → yield display event to frontend
    Structured results from tool execution
  • requires_user_input → pause loop & save state
    + interactive_data → structured options for frontend rendering
    ⚠️ User's choice becomes tool_result content for Level 4
    (tool_use input already has question/options - Claude knows context)
  • artifact_data → yield artifact event to frontend
    [Separate article - most complex pattern]

Level 2: Tool Executor

Receives: Varied returns from tool functions

Creates: Standard tool_result structure

Preserves: Extra fields for agent loop

if "display_data" in result:
    tool_result["display_data"] = result["display_data"]
if "artifact_data" in result:
    tool_result["artifact_data"] = result["artifact_data"]
if "requires_user_input" in result:
    tool_result["requires_user_input"] = result["requires_user_input"]

Level 1: Tool Functions (Varied Returns)

Regular Tool

calculate

return {
  "success": True,
  "content": "Result: 42"
}

Content can be string, text blocks, documents, images (depends on tool design)

Display Tool

analyze_text

return {
  "success": True,
  "content": "Analysis displayed",
  "display_data": {
    "word_count": 150,
    "readability": 85
  }
}

+ display_data (structured results from tool execution) for inline rendering

Interactive Tool

propose_options

return {
  "success": True,
  "requires_user_input": True,
  "interactive_data": {
    "question": "Choose learning path",
    "options": ["Python", "JavaScript", "C++"]
  }
  # Content added later: "User selected: C++"
}

Signals pause + structured data for frontend. Content filled with user's choice.

Why this works:

  • Separation of concerns: Each layer has single responsibility
  • Flexibility: Tools can return rich data without breaking API contract
  • Persistence: Extra fields stored in database for frontend reconstruction
  • Type safety: Claude API validates strict schema, catches errors early

Future Extensibility:

Anthropic’s API supports rich content beyond strings (source: tool use documentation):

  • Text blocks: {"type": "text", "text": "..."}
  • Image blocks: {"type": "image", ...}
  • Document blocks: {"type": "document", ...}

Our architecture supports these formats when needed—simply update tool functions and preserve the blocks through the layers.

Display tools have two patterns depending on who computes the data:

  1. Claude-computed: Claude analyzes and returns structured output in tool_use input. Server just displays what Claude computed. Content can be minimal since Claude already knows the results.

  2. Server-computed: Server processes input and generates new data Claude hasn’t seen. Content must include results so Claude can reason about them.

Key question: Does Claude need to know the results to continue reasoning? If yes, include in content. If no, minimal confirmation suffices.


Pattern 1: Claude-Computed (Minimal Content)

# Tool: display_chart (Claude already analyzed data)
# tool_use input: {"data": [10, 20, 30], "type": "bar"}

# LEVEL 1: Tool Function
async def display_chart_execute(tool_use):
    chart_data = tool_use["input"]["data"]  # Claude provided this

    return {
        "success": True,
        "content": "Chart displayed to user.",  # ✅ Minimal - Claude has data
        "display_data": {"type": "bar", "data": chart_data}
    }

# LEVEL 2-3: Preserve → Yield event → Save to DB
# LEVEL 4: Strip display_data, send to Claude
# Claude sees: tool_use (data) + tool_result ("Chart displayed") ✅

Pattern 2: Server-Computed (Informative Content)

# Tool: analyze_text (Server computes new data)
# tool_use input: {"text": "some text"}

# LEVEL 1: Tool Function
async def analyze_text_execute(tool_use):
    text = tool_use["input"]["text"]

    # SERVER computes analysis (Claude doesn't know this yet!)
    analysis = {
        "word_count": 150,
        "readability": 85,
        "avg_word_length": 4.5
    }

    return {
        "success": True,
        # ✅ Include results - Claude needs to know!
        "content": "Analysis: 150 words, 85% readability, avg 4.5 chars/word",
        "display_data": analysis
    }

# LEVEL 2-3: Preserve → Yield event → Save to DB
# LEVEL 4: Strip display_data, send to Claude
# Claude sees: tool_use (text) + tool_result (analysis results) ✅
# Claude can now reason: "The text has good readability at 85%..."

Key insight: If server generates data Claude hasn’t seen, put it in content. If Claude already computed it, minimal confirmation is fine.

Interactive tools pause the loop, wait for user input, then resume. Frontend collects user’s choice and backend prepares the tool_result content.

# ============================================================
# Interactive Tools: Full Flow Example (propose_options)
# ============================================================

# LEVEL 1: Tool Function
async def propose_options_execute(tool_use):
    # Return: signal + structured data for frontend
    return {
        "success": True,
        "requires_user_input": True,
        "interactive_data": {
            "question": "Choose learning path",
            "options": ["Python", "JavaScript", "C++"]
        }
        # No content - filled when user responds
    }


# LEVEL 2: Tool Executor
# Preserves requires_user_input and interactive_data
tool_result = {
    "type": "tool_result",
    "tool_use_id": "toolu_456",
    "requires_user_input": True,  # ✅ Pause signal
    "interactive_data": {...}      # ✅ Frontend needs this
}


# LEVEL 3: Agent Loop
# Detects pause signal, yields event, saves state
if result.get("requires_user_input"):
    # Extract question/options from tool_use input
    tool_input = tool_call["input"]

    yield {
        "type": "awaiting_user_input",
        "tool": tool_call["name"],
        "question": tool_input["question"],
        "options": tool_input["options"]
    }

    # Save loop state and STOP
    await message_manager.save_loop_state(step, tool_use_id)
    return  # ⚠️ LOOP PAUSED - waiting for user


# [User selects "C++" in frontend]
# [Frontend sends selection back to backend]


# LEVEL 4: Backend Prepares tool_result with User's Choice
# When user responds, backend adds tool_result:
tool_result = {
    "type": "tool_result",
    "tool_use_id": "toolu_456",
    "content": "User selected: C++"  # User's choice
}

# MessageManager sends to Claude (requires_user_input stripped)
# Claude sees:
#   - tool_use (question/options)
#   - tool_result (user's choice)
# Complete context without redundancy ✅

Key insight: Interactive tools don’t return content immediately. Content is filled later with user’s choice. The tool_use block already has question/options, so Claude has full context when the loop resumes.

Artifact tools handle complex, long-running content generation requiring separate rendering panels: blog posts, code files, mermaid diagrams, generated images, or interactive visualizations.

Unlike display tools (inline rendering) or interactive tools (user input collection), artifacts often require:

  • Background task processing (Celery, Redis Queue, or distributed task systems)
  • Compute-intensive generation that extends beyond HTTP request timeouts
  • Scheduled or long-running operations (multi-step LLM calls, image rendering, video processing)
  • Rich interaction capabilities (edit, download, export to multiple formats, version control)

The artifact pattern combines display tool data flow with asynchronous task management. Implementation involves task queue integration, progress streaming via WebSockets or SSE, cancellation handling, and frontend artifact panel design with state synchronization.

A dedicated article will cover artifact implementation patterns in depth.

Putting It All Together

This article covered the building blocks of production agent APIs: MessageManager for state management, HybridStorage for L1/L2 caching, FastAPI integration for streaming responses, and advanced tool patterns for display, user interaction, and artifacts. The following architecture shows how these components integrate in a working system.

FastAPI Endpoint

Input: session_id, message

Uses: user_id = "demo_user" (hardcoded - auth planned)

Output: SSE event stream

Role: API layer - routing, session auto-create, streaming

→ Creates MessageManager and Agent Loop

⚙️ Tool results: Loop saves via MessageManager (with extra fields)

Message Manager

Input: session_id, user_id, storage instance

Output: Messages in API format (chat history)

Role: Load history, add messages, auto-save, context engineering

→ Provides messages to Agent Loop

↔ Loads/saves via Storage

⚙️ Tool results: Stores with extra fields → Strips before Claude API

Hybrid Storage

Input: session_id, message data

Output: Messages list

Role: Cache-first loading (Redis → PostgreSQL), write-through saving

↔ Used by MessageManager for all persistence

Redis (L1)

Fast cache, 1hr TTL

PostgreSQL (L2)

Durable persistence

Agent Loop

Input: MessageManager instance, tools, executors

Output: Event stream (messages, tool execution, display, interactive)

Role: Multi-turn orchestration, tool execution, state management

← Receives messages from MessageManager

→ Yields events to FastAPI for streaming

Uses: Agent Turn + Tool Executor primitives

⚙️ Tool results: Preserves extra fields, yields display/artifact/interactive events

Agent Turn

run_agent_turn()

In: client, model, messages, system, tools

Out: response, content_blocks, usage

Single LLM API call - returns thinking, text, tool_use blocks

⚙️ Messages contain API-compatible tool_results (cleaned)

Tool Executor

execute_tools()

In: content_blocks, tool_executors

Out: tool_results[]

Executes tool_use blocks

⚙️ Preserves: display_data, artifact_data, requires_user_input

Component responsibilities:

Next steps: With this foundation in place, you can add authentication, build the frontend to handle display/artifact/interactive events, create domain-specific tools, and deploy to production. The architecture scales from prototypes to production traffic through careful separation of concerns and smart caching strategies.

References

This implementation draws on patterns from the following Anthropic resources: