Why Multi-Agent Over Single-Agent?
Single-agent systems hit limits on complex tasks:
Problem 1: Context window overflow
- Task: "Research competitors, write a report, generate charts"
- Single agent: 50K token context → expensive + slow
Problem 2: Tool use complexity
- Single agent with 20 tools → poor tool selection (hallucinated tool calls)
Problem 3: Quality degradation
- Single agent doing research + writing → mediocre at both
Multi-agent solution:
Researcher Agent (tools: search, scrape)
→ Analyst Agent (tools: pandas, matplotlib)
→ Writer Agent (tools: none, just synthesis)
Each agent specializes, reducing context size and improving quality.
Framework Comparison
| Feature |
LangGraph |
CrewAI |
AutoGen |
| Orchestration |
Directed graph (nodes + edges) |
Role-based crews (sequential/hierarchical) |
Conversational GroupChat |
| Learning Curve |
Medium (graph concepts) |
Lowest (role-based DSL) |
Medium (conversation patterns) |
| Streaming |
✅ Per-node streaming |
❌ Limited |
❌ Limited |
| Debugging |
✅ Time-travel via LangSmith |
⚠️ Basic logging |
⚠️ Message history |
| Production Readiness |
Highest |
Medium |
Medium |
| Checkpointing |
✅ Built-in (resume workflows) |
❌ |
❌ |
| Parallelization |
✅ Concurrent nodes |
⚠️ Sequential by default |
❌ Turn-based |
| Cost/Latency |
Medium |
Lowest (simple workflows) |
Highest (debate loops) |
LangGraph: Graph-Based Orchestration
LangGraph models workflows as state machines with conditional edges.
Architecture
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Sequence
import operator
# Define shared state
class AgentState(TypedDict):
messages: Annotated[Sequence[str], operator.add]
research_results: str
report: str
next_agent: str
# Create graph
workflow = StateGraph(AgentState)
# Define nodes (agents)
def researcher_node(state: AgentState):
llm = ChatOpenAI(model="gpt-4")
query = state["messages"][-1]
# Use tools to search
results = search_tool.invoke(query)
return {
"research_results": results,
"next_agent": "analyst"
}
def analyst_node(state: AgentState):
llm = ChatOpenAI(model="gpt-4")
# Analyze research results
analysis = llm.invoke(f"Analyze: {state['research_results']}")
return {
"messages": [analysis.content],
"next_agent": "writer"
}
def writer_node(state: AgentState):
llm = ChatOpenAI(model="gpt-4")
# Synthesize final report
report = llm.invoke(f"Write report based on: {state['messages']}")
return {
"report": report.content,
"next_agent": END
}
# Add nodes
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
# Add edges
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", END)
# Set entry point
workflow.set_entry_point("researcher")
# Compile
app = workflow.compile()
# Execute
result = app.invoke({
"messages": ["Research AI agent frameworks"]
})
print(result["report"])
Advanced: Conditional Routing
def router(state: AgentState):
"""Route based on state"""
if "error" in state.get("research_results", ""):
return "retry_researcher"
elif len(state.get("research_results", "")) < 100:
return "deep_researcher" # Need more data
else:
return "analyst"
# Conditional edges
workflow.add_conditional_edges(
"researcher",
router,
{
"retry_researcher": "researcher",
"deep_researcher": "deep_researcher",
"analyst": "analyst"
}
)
Checkpointing (Resume Workflows)
from langgraph.checkpoint.sqlite import SqliteSaver
# Persistent checkpointing
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=checkpointer)
# Run with thread_id
config = {"configurable": {"thread_id": "task-123"}}
result = app.invoke({"messages": ["Research..."]}, config)
# Later: resume from checkpoint
resumed = app.invoke(None, config) # Continues from last state
Streaming
# Stream tokens as they arrive
for chunk in app.stream({"messages": ["Research AI agents"]}, stream_mode="values"):
if "report" in chunk:
print(chunk["report"], end="", flush=True)
CrewAI: Role-Based Teams
CrewAI uses a declarative DSL with roles, goals, and tools.
Basic Implementation
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
# Define agents
researcher = Agent(
role="Senior Researcher",
goal="Uncover cutting-edge developments in AI agents",
backstory="You're a researcher at a leading tech think tank. "
"Your expertise lies in identifying emerging trends.",
tools=[SerperDevTool(), ScrapeWebsiteTool()],
llm=ChatOpenAI(model="gpt-4"),
verbose=True
)
analyst = Agent(
role="Data Analyst",
goal="Analyze research findings and extract insights",
backstory="You're a data analyst with expertise in AI/ML. "
"You excel at finding patterns in research data.",
tools=[], # No tools, just reasoning
llm=ChatOpenAI(model="gpt-4"),
verbose=True
)
writer = Agent(
role="Technical Writer",
goal="Craft compelling technical content",
backstory="You're a technical writer known for making complex "
"topics accessible to developers.",
tools=[],
llm=ChatOpenAI(model="gpt-4"),
verbose=True
)
# Define tasks
research_task = Task(
description="Research the latest AI agent frameworks (LangGraph, CrewAI, AutoGen). "
"Focus on architecture, performance, and use cases.",
expected_output="Detailed research report with sources",
agent=researcher
)
analysis_task = Task(
description="Analyze the research findings and compare frameworks. "
"Identify strengths, weaknesses, and best use cases.",
expected_output="Comparative analysis with recommendations",
agent=analyst,
context=[research_task] # Depends on research_task
)
writing_task = Task(
description="Write a technical blog post comparing the frameworks. "
"Include code examples and recommendations.",
expected_output="2000-word blog post in markdown",
agent=writer,
context=[research_task, analysis_task]
)
# Create crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.sequential, # or Process.hierarchical
verbose=True
)
# Execute
result = crew.kickoff()
print(result)
Hierarchical Process (Manager Agent)
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
process=Process.hierarchical, # Manager delegates tasks
manager_llm=ChatOpenAI(model="gpt-4")
)
# Manager agent decides task order and delegation
result = crew.kickoff()
AutoGen: Conversational Agents
AutoGen uses multi-agent conversations with debate-style collaboration.
Basic Implementation
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
import openai
openai.api_key = "sk-..."
config_list = [{"model": "gpt-4", "api_key": openai.api_key}]
# Define agents
researcher = AssistantAgent(
name="Researcher",
system_message="You are a researcher. Use web search to find information.",
llm_config={"config_list": config_list},
)
analyst = AssistantAgent(
name="Analyst",
system_message="You analyze data and provide insights.",
llm_config={"config_list": config_list},
)
writer = AssistantAgent(
name="Writer",
system_message="You write technical content based on research and analysis.",
llm_config={"config_list": config_list},
)
user_proxy = UserProxyAgent(
name="User",
human_input_mode="NEVER",
max_consecutive_auto_reply=0,
code_execution_config=False,
)
# Create group chat
groupchat = GroupChat(
agents=[researcher, analyst, writer, user_proxy],
messages=[],
max_round=10,
speaker_selection_method="round_robin" # or "auto"
)
manager = GroupChatManager(groupchat=groupchat, llm_config={"config_list": config_list})
# Start conversation
user_proxy.initiate_chat(
manager,
message="Research and write a blog post about AI agent frameworks."
)
Custom Speaker Selection
def custom_speaker_selector(last_speaker, groupchat):
"""Route based on last message"""
last_message = groupchat.messages[-1]["content"]
if "research" in last_message.lower():
return groupchat.agent_by_name("Researcher")
elif "analyze" in last_message.lower():
return groupchat.agent_by_name("Analyst")
elif "write" in last_message.lower():
return groupchat.agent_by_name("Writer")
else:
return groupchat.agent_by_name("User")
groupchat = GroupChat(
agents=[researcher, analyst, writer, user_proxy],
messages=[],
max_round=10,
speaker_selection_method=custom_speaker_selector
)
Cost & Latency Benchmarks
Tested on task: "Research competitors, analyze data, write report"
| Framework |
LLM Calls |
Total Tokens |
Cost |
Latency |
Quality |
| LangGraph |
3 |
8,500 |
$0.17 |
12s |
High |
| CrewAI |
3 |
9,200 |
$0.18 |
14s |
High |
| AutoGen (3 agents, 5 rounds) |
22 |
45,000 |
$0.90 |
38s |
Highest |
Analysis:
- LangGraph/CrewAI: One LLM call per agent (3 total)
- AutoGen: Each agent responds to every message → exponential growth
- AutoGen quality: Collaborative reasoning produces best output, but at 5x cost
Cost Optimization for AutoGen
# Limit debate rounds
groupchat = GroupChat(
agents=[researcher, analyst, writer],
messages=[],
max_round=3, # Reduce from 10 to 3
)
# Use cheaper model for intermediate agents
researcher_config = {"config_list": [{"model": "gpt-3.5-turbo"}]}
analyst_config = {"config_list": [{"model": "gpt-3.5-turbo"}]}
writer_config = {"config_list": [{"model": "gpt-4"}]} # Only final agent uses GPT-4
researcher = AssistantAgent("Researcher", llm_config=researcher_config)
analyst = AssistantAgent("Analyst", llm_config=analyst_config)
writer = AssistantAgent("Writer", llm_config=writer_config)
Production Patterns
1. Fallback & Retry Logic
# LangGraph: Retry failed nodes
def researcher_with_retry(state: AgentState, max_retries=3):
for attempt in range(max_retries):
try:
result = researcher_node(state)
if result["research_results"]:
return result
except Exception as e:
if attempt == max_retries - 1:
return {"research_results": "ERROR: Failed after retries", "next_agent": END}
time.sleep(2 ** attempt) # Exponential backoff
return {"research_results": "ERROR", "next_agent": END}
2. Human-in-the-Loop
# CrewAI: Require human approval for critical tasks
human_approval_task = Task(
description="Review the report before publishing",
expected_output="Approved or rejected with feedback",
agent=None, # Human task
human_input=True
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task, human_approval_task],
process=Process.sequential
)
3. Per-Agent Cost Tracking
# LangGraph: Track token usage
from langchain.callbacks import get_openai_callback
agent_costs = {}
def tracked_node(agent_name):
def node_fn(state):
with get_openai_callback() as cb:
result = agent_function(state)
agent_costs[agent_name] = {
"tokens": cb.total_tokens,
"cost": cb.total_cost
}
return result
return node_fn
workflow.add_node("researcher", tracked_node("researcher"))
workflow.add_node("writer", tracked_node("writer"))
# After execution
print(f"Researcher cost: ${agent_costs['researcher']['cost']:.4f}")
print(f"Writer cost: ${agent_costs['writer']['cost']:.4f}")
4. Parallel Agent Execution
# LangGraph: Run independent agents in parallel
from langgraph.graph import START
workflow.add_node("researcher_tech", researcher_tech_node)
workflow.add_node("researcher_market", researcher_market_node)
workflow.add_node("synthesizer", synthesizer_node)
# Both researchers run in parallel
workflow.add_edge(START, "researcher_tech")
workflow.add_edge(START, "researcher_market")
# Synthesizer waits for both
workflow.add_edge("researcher_tech", "synthesizer")
workflow.add_edge("researcher_market", "synthesizer")
# Execution: researcher_tech and researcher_market run concurrently
# Result: 2x faster than sequential
Real-World Use Cases
Use Case 1: Code Generation + Review
LangGraph implementation:
class CodeState(TypedDict):
requirements: str
code: str
tests: str
review: str
approved: bool
def coder_node(state):
llm = ChatOpenAI(model="gpt-4")
code = llm.invoke(f"Write code for: {state['requirements']}")
return {"code": code.content}
def tester_node(state):
llm = ChatOpenAI(model="gpt-4")
tests = llm.invoke(f"Write tests for: {state['code']}")
return {"tests": tests.content}
def reviewer_node(state):
llm = ChatOpenAI(model="gpt-4")
review = llm.invoke(f"Review code:\n{state['code']}\n\nTests:\n{state['tests']}")
# Extract approval decision
approved = "approved" in review.content.lower()
return {"review": review.content, "approved": approved}
workflow.add_node("coder", coder_node)
workflow.add_node("tester", tester_node)
workflow.add_node("reviewer", reviewer_node)
workflow.add_edge("coder", "tester")
workflow.add_edge("tester", "reviewer")
# Conditional: If not approved, retry coder
def should_retry(state):
return "coder" if not state["approved"] else END
workflow.add_conditional_edges("reviewer", should_retry)
Use Case 2: Research Report Generation
CrewAI implementation:
researcher = Agent(
role="Research Specialist",
goal="Find data on {topic}",
tools=[SerperDevTool(), ScrapeWebsiteTool()],
llm=ChatOpenAI(model="gpt-4")
)
fact_checker = Agent(
role="Fact Checker",
goal="Verify claims in research",
tools=[SerperDevTool()],
llm=ChatOpenAI(model="gpt-4")
)
writer = Agent(
role="Writer",
goal="Create compelling report",
tools=[],
llm=ChatOpenAI(model="gpt-4")
)
research_task = Task(
description="Research {topic}. Find statistics, case studies, and expert opinions.",
agent=researcher
)
fact_check_task = Task(
description="Verify all claims from research. Flag any unverified statements.",
agent=fact_checker,
context=[research_task]
)
writing_task = Task(
description="Write 2000-word report. Use only fact-checked information.",
agent=writer,
context=[research_task, fact_check_task]
)
crew = Crew(
agents=[researcher, fact_checker, writer],
tasks=[research_task, fact_check_task, writing_task],
process=Process.sequential
)
result = crew.kickoff(inputs={"topic": "AI agent frameworks"})
Framework Selection Guide
Choose LangGraph if:
- ✅ Complex workflows with conditional logic
- ✅ Need production debugging (LangSmith integration)
- ✅ Streaming is critical for UX
- ✅ Checkpointing for long-running tasks
- ✅ Team familiar with graph-based orchestration
Choose CrewAI if:
- ✅ Fast prototyping (< 1 day to working system)
- ✅ Simple sequential workflows
- ✅ Non-technical stakeholders need to understand flow (role-based is intuitive)
- ✅ Budget-constrained (lowest LLM costs)
- ✅ Don't need advanced observability
Choose AutoGen if:
- ✅ Quality > cost (collaborative reasoning improves output)
- ✅ Research/exploration tasks (debate generates novel ideas)
- ✅ Offline batch processing (latency not critical)
- ✅ Small number of iterations (< 5 rounds)
- ✅ Team comfortable with conversational patterns
Monitoring & Observability
LangSmith (LangGraph)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls-..."
# Automatic tracing to LangSmith dashboard
result = app.invoke({"messages": ["Research AI agents"]})
# View in dashboard:
# - Per-node latency
# - Token usage per agent
# - Full conversation history
# - Time-travel debugging (replay from any state)
Custom Metrics (All Frameworks)
from prometheus_client import Counter, Histogram
agent_calls = Counter(
'multi_agent_calls_total',
'Total agent invocations',
['agent_name', 'status']
)
agent_latency = Histogram(
'multi_agent_latency_seconds',
'Agent execution time',
['agent_name']
)
def monitored_node(agent_name):
def node_fn(state):
with agent_latency.labels(agent_name=agent_name).time():
try:
result = agent_function(state)
agent_calls.labels(agent_name=agent_name, status='success').inc()
return result
except Exception as e:
agent_calls.labels(agent_name=agent_name, status='error').inc()
raise
return node_fn
Common Pitfalls
1. Infinite Loops
Problem: Conditional routing creates endless cycles
# BAD: No termination condition
def router(state):
if state["quality_score"] < 8:
return "writer" # Infinite loop if quality never improves
return END
# GOOD: Add max iterations
class State(TypedDict):
quality_score: int
iterations: int
def router(state):
if state["iterations"] >= 3:
return END
if state["quality_score"] < 8:
return "writer"
return END
2. Token Limit Explosions
Problem: AutoGen debates grow conversation history exponentially
# BAD: Full history sent every round
groupchat = GroupChat(agents=[...], max_round=20) # Can hit 100K+ tokens
# GOOD: Summarize history
from autogen import ConversableAgent
class SummarizingAgent(ConversableAgent):
def _process_received_message(self, message, sender, silent):
if len(self._oai_messages[sender]) > 10:
# Summarize old messages
summary = summarize_messages(self._oai_messages[sender][:10])
self._oai_messages[sender] = [summary] + self._oai_messages[sender][10:]
return super()._process_received_message(message, sender, silent)
3. Missing Error Boundaries
Problem: One agent failure crashes entire workflow
# BAD: No error handling
def researcher_node(state):
return {"results": api_call_that_might_fail()}
# GOOD: Graceful degradation
def researcher_node(state):
try:
results = api_call_that_might_fail()
return {"results": results, "error": None}
except Exception as e:
logger.error(f"Researcher failed: {e}")
return {"results": "ERROR: Could not retrieve data", "error": str(e)}
FAQs
Can I mix frameworks?
Yes! Use LangGraph for orchestration, CrewAI agents as nodes:
from crewai import Agent
from langgraph.graph import StateGraph
researcher_crew_agent = Agent(role="Researcher", tools=[...])
def researcher_node(state):
result = researcher_crew_agent.execute_task(state["task"])
return {"research": result}
workflow = StateGraph(...)
workflow.add_node("researcher", researcher_node)
How do I scale to 100+ agents?
Use hierarchical delegation (manager agents):
# CrewAI hierarchical
manager_crew = Crew(
agents=[manager_agent],
tasks=[delegate_task],
process=Process.hierarchical
)
# Manager spawns sub-crews for specialized tasks
What about RAG integration?
All frameworks support RAG via tools:
# LangGraph + Pinecone RAG
from langchain.tools import Tool
from langchain_pinecone import PineconeVectorStore
retriever = PineconeVectorStore(...).as_retriever()
rag_tool = Tool(
name="knowledge_base",
func=lambda q: retriever.get_relevant_documents(q),
description="Search company knowledge base"
)
def researcher_node(state):
llm_with_tools = llm.bind_tools([rag_tool])
response = llm_with_tools.invoke(state["query"])
return {"research": response.content}
How do I handle rate limits?
Implement token bucket or exponential backoff:
from ratelimit import limits, sleep_and_retry
@sleep_and_retry
@limits(calls=10, period=60) # 10 calls per minute
def call_llm(prompt):
return llm.invoke(prompt)
def researcher_node(state):
result = call_llm(state["query"])
return {"research": result}
Can agents use different models?
Yes! Assign different LLMs per agent:
# LangGraph: Different models per node
gpt4_agent = ChatOpenAI(model="gpt-4")
gpt35_agent = ChatOpenAI(model="gpt-3.5-turbo")
def researcher_node(state):
return {"research": gpt4_agent.invoke(state["query"])}
def writer_node(state):
return {"report": gpt35_agent.invoke(state["research"])}
Next Steps:
- Prototype with CrewAI (fastest time-to-value)
- Migrate to LangGraph for production (observability + checkpointing)
- Add per-agent cost tracking (Prometheus metrics)
- Implement retry logic and error boundaries
- Monitor LLM call counts (alert if > expected for task type)
Multi-agent systems are the future of AI applications—moving beyond single-shot prompts to collaborative, specialized workflows. LangGraph wins for production systems, CrewAI for rapid prototyping, and AutoGen for research-quality output when cost isn't a constraint. Start simple (3 agents), measure everything (latency, cost, quality), and iterate based on real usage patterns.
For more AI architecture guides, check out RAG Pipeline Comparison and Building Real-Time Collaborative Features.