Building AI Agents with LangChain and FastAPI: A Complete Guide

Introduction

AI agents are programs that use a language model to reason about a task, decide which tools to use, and take actions to complete it — autonomously, step by step. In 2025-2026, building and deploying AI agents has become a core skill for backend developers.

LangChain is the most widely used Python framework for building LLM-powered applications. FastAPI is the best way to expose those applications as production-grade REST APIs. Together, they give you a powerful stack for building and serving AI agents at scale.

In this guide, you'll build a production-ready AI agent using LangChain, expose it as a FastAPI endpoint with streaming support, add memory for multi-turn conversations, and learn how to test and deploy it.

🔗 Make sure your Python environment is set up: uv: The Fast Python Package Manager Replacing pip in 2026

Introduction: What Are AI Agents and Why Do They Matter?

A traditional LLM call takes an input and returns an output in one step. An agent is different — it uses the LLM's reasoning capabilities to decide what to do next, which might involve:

  1. Calling a tool (searching the web, querying a database, running code)
  2. Processing the tool's output
  3. Calling another tool if needed
  4. Repeating until the task is complete
  5. Returning a final answer

This "think → act → observe → think" loop is what makes agents powerful: they can handle complex, multi-step tasks that a single LLM call cannot.

When to Use Agents vs. Simple LLM Calls

Use caseApproach
Answer a factual question from contextSimple chain
Summarize a documentSimple chain
Research a topic (needs web search)Agent
Debug code (needs to read files, run code)Agent
Book a calendar event (needs tools + reasoning)Agent
Complex data analysis over multiple tablesAgent with RAG

Understanding LangChain's Agent Architecture

LangChain's agent architecture consists of several components:

Core Components

  • LLM / Chat Model: The underlying language model (OpenAI, Anthropic, local models)
  • Tools: Python functions the agent can call, described with a name, description, and input schema
  • Agent: The reasoning loop that decides which tools to call and in what order
  • AgentExecutor: The runtime that runs the agent loop, handles errors, and enforces limits
  • Memory: Stores conversation history for multi-turn interactions

The ReAct Pattern

LangChain's most common agent type uses the ReAct (Reasoning + Acting) pattern:

Thought: I need to find the current weather in London
Action: web_search
Action Input: "current weather London"
Observation: Weather in London: 12°C, partly cloudy
Thought: I have the information. I can now answer the user's question.
Final Answer: The current weather in London is 12°C and partly cloudy.

LangChain Expression Language (LCEL)

Modern LangChain uses LCEL to compose chains using the | operator:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{input}"),
])
parser = StrOutputParser()

chain = prompt | llm | parser
result = chain.invoke({"input": "What is FastAPI?"})

Project Setup: FastAPI + LangChain + LLM Provider

Install Dependencies

uv init langchain-agent-api
cd langchain-agent-api

uv add fastapi "uvicorn[standard]" \
    langchain langchain-openai langchain-community \
    pydantic-settings python-dotenv \
    "redis[hiredis]"

uv add --dev pytest pytest-asyncio httpx ruff

Project Structure

langchain-agent-api/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app
│   ├── agents/
│   │   ├── __init__.py
│   │   ├── base.py          # Base agent setup
│   │   └── research.py      # Research agent
│   ├── tools/
│   │   ├── __init__.py
│   │   ├── search.py        # Web search tool
│   │   └── calculator.py    # Math tool
│   ├── memory/
│   │   └── redis_memory.py  # Redis-backed conversation memory
│   └── core/
│       ├── config.py
│       └── dependencies.py
├── pyproject.toml
└── .env

Configuration

# app/core/config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    OPENAI_API_KEY: str
    REDIS_URL: str = "redis://localhost:6379/0"
    AGENT_MAX_ITERATIONS: int = 10
    AGENT_TIMEOUT: int = 60  # seconds

    class Config:
        env_file = ".env"


settings = Settings()
# .env
OPENAI_API_KEY=sk-...
REDIS_URL=redis://localhost:6379/0

Building Your First ReAct Agent with Tool Calling

# app/tools/search.py
from langchain_core.tools import tool
import httpx


@tool
async def web_search(query: str) -> str:
    """Search the web for current information on a topic.
    Use this when you need up-to-date information or facts you're not sure about.
    """
    # In production, use a real search API (Tavily, SerpAPI, etc.)
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://api.tavily.com/search",
            params={"query": query, "max_results": 3},
            headers={"Authorization": f"Bearer {settings.TAVILY_API_KEY}"},
        )
        results = response.json()["results"]
        return "\n\n".join(
            f"**{r['title']}**\n{r['content']}" for r in results
        )


@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression. Input should be a valid Python math expression.
    Example: '2 ** 10', '(15 + 7) * 3'
    """
    import ast
    import operator

    allowed_ops = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Pow: operator.pow,
        ast.USub: operator.neg,
    }

    def eval_expr(node):
        if isinstance(node, ast.Constant):
            return node.value
        elif isinstance(node, ast.BinOp):
            return allowed_ops[type(node.op)](eval_expr(node.left), eval_expr(node.right))
        elif isinstance(node, ast.UnaryOp):
            return allowed_ops[type(node.op)](eval_expr(node.operand))
        raise ValueError(f"Unsupported expression")

    try:
        tree = ast.parse(expression, mode="eval")
        result = eval_expr(tree.body)
        return str(result)
    except Exception as e:
        return f"Error evaluating expression: {e}"
# app/agents/base.py
from langchain_openai import ChatOpenAI
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from app.core.config import settings
from app.tools.search import web_search, calculate


def create_agent_executor() -> AgentExecutor:
    llm = ChatOpenAI(
        model="gpt-4o",
        temperature=0,
        api_key=settings.OPENAI_API_KEY,
    )

    tools = [web_search, calculate]

    # Pull the ReAct prompt from LangChain hub
    prompt = hub.pull("hwchase17/react")

    agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)

    return AgentExecutor(
        agent=agent,
        tools=tools,
        max_iterations=settings.AGENT_MAX_ITERATIONS,
        max_execution_time=settings.AGENT_TIMEOUT,
        handle_parsing_errors=True,
        verbose=True,
    )

Adding Memory: Conversation History and Context Windows

For multi-turn conversations, the agent needs to remember previous exchanges. We'll use Redis to store conversation history:

# app/memory/redis_memory.py
import json
import redis.asyncio as aioredis
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from app.core.config import settings


class RedisConversationMemory:
    def __init__(self, session_id: str, ttl: int = 3600):
        self.session_id = session_id
        self.ttl = ttl
        self.key = f"chat:history:{session_id}"
        self._redis: aioredis.Redis | None = None

    async def get_redis(self) -> aioredis.Redis:
        if self._redis is None:
            self._redis = aioredis.from_url(settings.REDIS_URL, decode_responses=True)
        return self._redis

    async def get_messages(self) -> list[BaseMessage]:
        redis = await self.get_redis()
        raw = await redis.get(self.key)
        if not raw:
            return []

        messages = []
        for item in json.loads(raw):
            if item["role"] == "human":
                messages.append(HumanMessage(content=item["content"]))
            elif item["role"] == "ai":
                messages.append(AIMessage(content=item["content"]))
        return messages

    async def add_messages(self, human_msg: str, ai_msg: str):
        redis = await self.get_redis()
        messages = await self.get_messages()

        messages.append(HumanMessage(content=human_msg))
        messages.append(AIMessage(content=ai_msg))

        # Keep last 20 messages to avoid exceeding context window
        messages = messages[-20:]

        serialized = [
            {"role": "human" if isinstance(m, HumanMessage) else "ai", "content": m.content}
            for m in messages
        ]
        await redis.setex(self.key, self.ttl, json.dumps(serialized))

    async def clear(self):
        redis = await self.get_redis()
        await redis.delete(self.key)

🔗 See: Redis Caching in Django and FastAPI: A Practical Guide

Building a RAG Pipeline with a Vector Store

Retrieval-Augmented Generation (RAG) lets your agent answer questions based on your own documents:

# app/rag/pipeline.py
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document


class RAGPipeline:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.embeddings = OpenAIEmbeddings()
        self.persist_directory = persist_directory
        self._vectorstore: Chroma | None = None

    def get_vectorstore(self) -> Chroma:
        if self._vectorstore is None:
            self._vectorstore = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings,
            )
        return self._vectorstore

    def ingest_documents(self, documents: list[Document]):
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
        )
        chunks = splitter.split_documents(documents)
        vectorstore = self.get_vectorstore()
        vectorstore.add_documents(chunks)

    def as_retriever_tool(self):
        from langchain_core.tools import Tool

        retriever = self.get_vectorstore().as_retriever(
            search_type="mmr",
            search_kwargs={"k": 5, "fetch_k": 20},
        )

        def retrieve(query: str) -> str:
            docs = retriever.invoke(query)
            return "\n\n".join(doc.page_content for doc in docs)

        return Tool(
            name="search_knowledge_base",
            description=(
                "Search the internal knowledge base for relevant information. "
                "Use this before searching the web for topics the knowledge base might cover."
            ),
            func=retrieve,
        )

Exposing the Agent as a FastAPI Endpoint with SSE Streaming

Agents can take time to respond. Streaming the response token-by-token provides a much better user experience:

# app/main.py
import asyncio
from typing import AsyncGenerator

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from app.agents.base import create_agent_executor
from app.memory.redis_memory import RedisConversationMemory

app = FastAPI(title="AI Agent API")


class ChatRequest(BaseModel):
    message: str
    session_id: str = "default"


class ChatResponse(BaseModel):
    response: str
    session_id: str


@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Non-streaming chat endpoint."""
    memory = RedisConversationMemory(request.session_id)
    history = await memory.get_messages()

    agent_executor = create_agent_executor()

    # Format history as a string for the ReAct agent
    chat_history = "\n".join(
        f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
        for m in history
    )

    try:
        result = await asyncio.wait_for(
            agent_executor.ainvoke({
                "input": request.message,
                "chat_history": chat_history,
            }),
            timeout=60.0,
        )
        response = result["output"]
        await memory.add_messages(request.message, response)
        return ChatResponse(response=response, session_id=request.session_id)

    except asyncio.TimeoutError:
        raise HTTPException(status_code=504, detail="Agent timed out")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
    """Streaming chat endpoint using Server-Sent Events."""

    async def generate() -> AsyncGenerator[str, None]:
        memory = RedisConversationMemory(request.session_id)
        history = await memory.get_messages()
        agent_executor = create_agent_executor()

        chat_history = "\n".join(
            f"{'Human' if isinstance(m, HumanMessage) else 'AI'}: {m.content}"
            for m in history
        )

        full_response = ""
        try:
            async for event in agent_executor.astream_events(
                {"input": request.message, "chat_history": chat_history},
                version="v1",
            ):
                kind = event["event"]
                if kind == "on_llm_stream":
                    chunk = event["data"]["chunk"].content
                    if chunk:
                        full_response += chunk
                        yield f"data: {chunk}\n\n"
                elif kind == "on_tool_start":
                    tool_name = event["name"]
                    yield f"data: [Using tool: {tool_name}...]\n\n"

            # Store in memory after full response
            await memory.add_messages(request.message, full_response)
            yield "data: [DONE]\n\n"

        except Exception as e:
            yield f"data: [ERROR: {e}]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )


@app.delete("/api/chat/{session_id}")
async def clear_conversation(session_id: str):
    """Clear a conversation's history."""
    memory = RedisConversationMemory(session_id)
    await memory.clear()
    return {"message": f"Session {session_id} cleared"}

🔗 For WebSocket alternatives to SSE: Implementing Real-Time Features with WebSockets in FastAPI and Django

Handling Agent State: Sessions and Multi-Turn Conversations

Each conversation should be isolated by session_id. Here's a complete example of a multi-turn exchange:

# Client usage
import httpx

BASE_URL = "http://localhost:8000"
SESSION_ID = "user-123-session-1"

with httpx.Client() as client:
    # Turn 1
    r1 = client.post(f"{BASE_URL}/api/chat", json={
        "message": "What is the current price of Bitcoin?",
        "session_id": SESSION_ID,
    })
    print(r1.json()["response"])

    # Turn 2 — agent remembers the context
    r2 = client.post(f"{BASE_URL}/api/chat", json={
        "message": "How does that compare to last year?",
        "session_id": SESSION_ID,
    })
    print(r2.json()["response"])

    # Clear session
    client.delete(f"{BASE_URL}/api/chat/{SESSION_ID}")

Error Handling, Retries, and Timeouts in Agentic Workflows

Agents can fail in ways that regular API calls don't: they might loop, hit rate limits, or have tools that fail. Robust error handling is critical:

import asyncio
import tenacity
from langchain_core.exceptions import OutputParserException


def create_robust_agent_executor() -> AgentExecutor:
    llm = ChatOpenAI(
        model="gpt-4o",
        temperature=0,
        # LangChain handles retries on rate limit errors automatically
        max_retries=3,
    )

    return AgentExecutor(
        agent=create_react_agent(llm=llm, tools=tools, prompt=prompt),
        tools=tools,
        max_iterations=10,          # prevent infinite loops
        max_execution_time=60,      # timeout in seconds
        handle_parsing_errors=True, # don't crash on malformed LLM output
        early_stopping_method="generate",  # graceful stop on max_iterations
    )


@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        result = await asyncio.wait_for(
            agent_executor.ainvoke({"input": request.message}),
            timeout=65.0,  # slightly longer than agent timeout
        )
        return ChatResponse(response=result["output"])

    except asyncio.TimeoutError:
        raise HTTPException(
            status_code=504,
            detail="The agent took too long to respond. Please try a simpler question."
        )
    except OutputParserException as e:
        raise HTTPException(
            status_code=500,
            detail="The agent produced an unexpected response format."
        )
    except Exception as e:
        # Log the full error for debugging
        logger.error(f"Agent error for session {request.session_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="An unexpected error occurred.")

Async Agents: Running Tools Concurrently with asyncio

When an agent calls multiple tools, running them concurrently can dramatically reduce latency:

from langchain_core.tools import StructuredTool
import asyncio


async def parallel_search(queries: list[str]) -> list[str]:
    """Run multiple searches concurrently."""
    tasks = [web_search.ainvoke({"query": q}) for q in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return [r if isinstance(r, str) else f"Error: {r}" for r in results]


@tool
async def research_topic(topic: str) -> str:
    """Research a topic from multiple angles simultaneously.
    Better than a single search when you need comprehensive information.
    """
    queries = [
        f"{topic} overview",
        f"{topic} latest news 2026",
        f"{topic} technical details",
    ]
    results = await parallel_search(queries)
    return "\n\n---\n\n".join(results)

🔗 For a deep dive into async patterns: Understanding Async Programming in Python: A Comprehensive Guide

Testing AI Agents

Testing agents requires mocking LLM calls so tests are fast, deterministic, and don't incur API costs:

# tests/test_agent.py
import pytest
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient, ASGITransport
from app.main import app


@pytest.fixture
async def client():
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as c:
        yield c


@pytest.mark.asyncio
async def test_simple_chat(client):
    """Test that the chat endpoint returns a response."""
    with patch("app.agents.base.ChatOpenAI") as mock_llm:
        # Mock the LLM response
        mock_llm.return_value.ainvoke = AsyncMock(
            return_value={"output": "Hello! How can I help you today?"}
        )

        response = await client.post("/api/chat", json={
            "message": "Hello",
            "session_id": "test-session",
        })

        assert response.status_code == 200
        assert "response" in response.json()


@pytest.mark.asyncio
async def test_agent_timeout(client):
    """Test that the endpoint handles agent timeouts gracefully."""
    async def slow_invoke(*args, **kwargs):
        await asyncio.sleep(100)  # Simulate a very slow agent

    with patch("app.agents.base.AgentExecutor.ainvoke", side_effect=slow_invoke):
        response = await client.post("/api/chat", json={
            "message": "Research every topic ever",
            "session_id": "test-timeout",
        })
        assert response.status_code == 504


@pytest.mark.asyncio
async def test_clear_conversation(client):
    """Test clearing a conversation session."""
    response = await client.delete("/api/chat/test-session")
    assert response.status_code == 200
    assert "cleared" in response.json()["message"]

🔗 Related: Testing APIs in Django and FastAPI: Best Practices and Tools

Deployment Considerations and Cost Management

Caching LLM Responses

For deterministic queries (same input → same output), cache LLM responses to reduce costs:

from langchain.globals import set_llm_cache
from langchain_community.cache import RedisCache
import redis

# Cache LLM responses in Redis
set_llm_cache(RedisCache(redis.from_url(settings.REDIS_URL)))

🔗 See: Redis Caching in Django and FastAPI: A Practical Guide

Background Task Processing for Long Agents

For agents that take more than ~30 seconds, use background tasks:

from fastapi import BackgroundTasks
import uuid

# Store results in Redis keyed by job ID
@app.post("/api/agent/jobs")
async def create_agent_job(request: ChatRequest, background_tasks: BackgroundTasks):
    job_id = str(uuid.uuid4())
    background_tasks.add_task(run_agent_job, job_id, request)
    return {"job_id": job_id}


@app.get("/api/agent/jobs/{job_id}")
async def get_job_result(job_id: str, redis: aioredis.Redis = Depends(get_redis)):
    result = await redis.get(f"job:{job_id}")
    if result is None:
        return {"status": "pending"}
    return {"status": "completed", "result": json.loads(result)}

🔗 Related: Using Background Tasks for Heavy Operations in FastAPI

Cost Estimation

ModelTypical agent cost per query
GPT-4o$0.002–$0.02 (varies by tools used)
GPT-4o-mini$0.0002–$0.002
Claude 3.5 Sonnet$0.003–$0.03

For production, always set usage limits and monitor costs via your provider's dashboard.

Conclusion: Next Steps — CrewAI, LangGraph, AutoGen

You've built a production-ready AI agent with LangChain and FastAPI. The patterns here — tool calling, memory, streaming, and async execution — apply across the agentic AI ecosystem.

What to explore next:

  • LangGraph: LangChain's framework for building stateful, graph-based agent workflows. Better for complex multi-agent systems with conditional logic
  • CrewAI: Framework for orchestrating multiple specialized agents working together on a task
  • AutoGen: Microsoft's multi-agent conversation framework, good for agent-to-agent communication
  • MCP: If you want your agents to integrate with AI assistants like Claude, see Building MCP Servers with Python: The Complete Guide for 2026

The key architectural decisions you made — exposing agents via FastAPI, using Redis for session memory, streaming responses — will serve you well as you move to more complex multi-agent systems.

🔗 Related: Building an AI-Powered Code Generator with OpenAI API | Running Lightweight Open-Source LLMs Locally


📌 Up next: Deploying Python APIs to Production: Railway, Fly.io, and Render

STAY IN TOUCH

Get notified when I publish something new, and unsubscribe at any time.