Build Your First A2A Agent: An Email Drafting Pipeline Using Python and OpenAI
- 11 hours ago
- 21 min read
Introduction
Most AI email tools work as a single prompt: paste your draft, get a rewrite. The problem is that rewriting well requires two very different cognitive tasks — understanding what is wrong with the email, and then knowing how to fix it. Combining both into one prompt produces mediocre results for the same reason that asking a single person to be both a critic and a writer at the same time produces weak output.
In this tutorial, we build an A2A Multi-Agent Email Drafting Pipeline where those two tasks are handled by two independent, specialised agents that communicate over HTTP using the Agent-to-Agent (A2A) protocol. Agent 1 classifies the email — intent, tone, recipient type, and specific issues. Agent 2 receives that classification alongside the original draft and rewrites the email professionally, fixing every identified problem.
The key concept here is horizontal agent communication: neither agent knows about the other’s internals. They expose standard A2A interfaces, discover each other via agent cards, and exchange typed messages — the same pattern used in production multi-agent systems.

What We’re Building
The pipeline follows a two-step sequential workflow:
Step | Agent | What Happens |
1 | Classifier Agent (port 8001) | Reads the raw draft, returns intent, tone, recipient type, and a list of specific issues |
2 | Rewriter Agent (port 8002) | Receives the original draft plus the classification, returns a subject line, professional body, and list of improvements |
The steps are sequential by design — Step 2 depends on Step 1’s output. The rewriter cannot fix issues it has not identified, so both pieces of context (original email and classification) are combined into the rewriter’s input.
The entry point is a terminal app where the user pastes a rough draft, and the pipeline returns the full classification, the rewritten email, and a token-level cost breakdown.
What is the A2A Protocol?
A2A (Agent-to-Agent) is an open protocol for horizontal communication between independent agents. Unlike tool calls (where an LLM calls a function within the same process), A2A agents run as separate HTTP services and communicate through a standardised message format.
Each agent exposes: - An agent card at /.well-known/agent-card.json — a JSON description of its capabilities, input/output formats, and skills - A message endpoint that accepts SendMessageRequest objects and returns a Task result.
The orchestrator discovers agents by fetching their agent card URL, then sends messages using the A2A client. Agents never call each other directly — all routing goes through the orchestrator.
This design means each agent can be developed, tested, and deployed independently. Swapping the rewriter agent for a different implementation requires no changes to the classifier or orchestrator.
Tech Stack
Component | Tool |
Agent Protocol | A2A SDK (a2a-sdk==0.3.26) |
AI Model | GPT-4o-mini (both agents) |
API Client | openai Python SDK |
HTTP Server | Uvicorn + Starlette (via A2A SDK) |
HTTP Client | httpx (async) |
Terminal UI | rich |
Env Management | python-dotenv |
Why pin a2a-sdk==0.3.26? The SDK’s API changed significantly between minor versions. v0.3.26 uses Pydantic models (A2AClient, SendMessageRequest, Part(root=TextPart(...))) while later versions switched to a protobuf-based API. Pinning ensures the code runs reliably.
Project Structure
a2a_email_pipeline/
├── classifier_agent/
│ ├── __init__.py
│ ├── executor.py # ClassifierExecutor — OpenAI call + A2A response
│ └── server.py # Agent card + Starlette server on port 8001
├── rewriter_agent/
│ ├── __init__.py
│ ├── executor.py # RewriterExecutor — OpenAI call + A2A response
│ └── server.py # Agent card + Starlette server on port 8002
├── orchestrator.py # EmailOrchestrator — coordinates both agents sequentially
├── app.py # Terminal entry point — launches agents, runs input loop
├── requirements.txt
└── .env
Setting Up
1. Install Dependencies
pip install "a2a-sdk[http-server]==0.3.26" openai python-dotenv rich httpx uvicorn
2. Configure Environment
Create a .env file in the project folder:
# OpenAI API key — get yours at https://platform.openai.com/api-keys
OPENAI_API_KEY=your_openai_api_key_here
# Model used by the Email Classifier Agent (default: gpt-4o-mini)
CLASSIFIER_MODEL=gpt-4o-mini
# Model used by the Email Rewriter Agent (default: gpt-4o-mini)
REWRITER_MODEL=gpt-4o-mini
# Port for the Email Classifier Agent
CLASSIFIER_AGENT_PORT=8001
# Port for the Email Rewriter Agent
REWRITER_AGENT_PORT=8002
Building the Classifier Agent
The classifier agent is responsible for one thing: reading a raw email draft and returning a structured JSON analysis. It has no knowledge of the rewriter.
Executor — classifier_agent/executor.py
The executor is the brain of the agent. It implements the AgentExecutor interface required by the A2A SDK — two methods, execute() and cancel().
import json # standard library — parses the OpenAI JSON response and serialises the result dict
import os # reads OPENAI_API_KEY and CLASSIFIER_MODEL from the environment after load_dotenv()
from uuid import uuid4 # generates unique IDs for A2A messages and events — required by the protocol
from openai import AsyncOpenAI # async OpenAI client — needed because the executor runs inside an async event loop
from a2a.server.agent_execution import AgentExecutor, RequestContext # AgentExecutor is the interface all A2A executors must implement; RequestContext carries the incoming message
from a2a.server.events import EventQueue # the queue the executor writes results into — the server reads from it to build the HTTP response
from a2a.types import (
TaskStatusUpdateEvent, TaskState, TaskStatus, # used to signal task completion or cancellation back to the A2A framework
Message, Role, Part, TextPart, # A2A message building blocks — TextPart holds the text payload inside a Part inside a Message
)
from dotenv import load_dotenv # reads .env and injects values into os.environ — must be imported before calling load_dotenv()
load_dotenv() # must be called before any os.getenv() — injects .env values into the process environment
_CALL_METADATA = {
"dev_name": "Ganesh", # identifies who made the API call — attached to every OpenAI request for dashboard tracking
"project": "codex-test", # project label for grouping calls in the OpenAI usage dashboard
"environment": "local", # marks calls as coming from a local dev machine — not production
"purpose": "testing", # intent label — useful when reviewing API usage logs
}
# Analyses the raw email and returns intent, tone, recipient type, and specific issues
_CLASSIFY_PROMPT = (
"You are an email tone and intent classifier. "
"Analyse the given email draft and return ONLY JSON: "
'{"intent": "request|complaint|follow-up|introduction|apology|announcement", '
'"tone": "too casual|aggressive|too wordy|unprofessional|appropriate", '
'"recipient_type": "colleague|client|manager|stranger", '
'"issues": ["list of specific tone, grammar, or clarity problems found in the email"]}.'
)
The prompt constrains the model to return only the JSON structure — no markdown, no explanation. response_format={"type": "json_object"} enforces this at the API level.
class ClassifierExecutor(AgentExecutor):
"""A2A executor — reads the raw email draft and classifies its intent, tone, and issues."""
def __init__(self):
self.llm = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY", "")) # async client — the executor runs inside an async event loop
self.model = os.getenv("CLASSIFIER_MODEL", "gpt-4o-mini") # read from .env — change the model without touching source code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
raw_email = context.get_user_input() # SDK utility — extracts and joins all text parts from the incoming A2A message
if not raw_email: # guard against empty messages — sends an error result rather than calling the API with nothing
await self._complete(event_queue, context, {"error": "No email text received"})
return
try:
response = await self.llm.chat.completions.create(
model=self.model, # resolved from CLASSIFIER_MODEL in .env — gpt-4o-mini by default
messages=[
{"role": "system", "content": _CLASSIFY_PROMPT}, # sets the model's persona and output format for this call
{"role": "user", "content": f"Email draft:\n{raw_email}"}, # the actual email text the user submitted
],
response_format={"type": "json_object"}, # forces the model to return valid JSON — no markdown fence, no explanation text
max_tokens=250, # classification output is short — 250 tokens is more than enough for the JSON response
metadata=_CALL_METADATA, # attached to the API call for dashboard tracking — does not affect model output
)
raw = response.choices[0].message.content or "{}" # choices[0] is the first completion — or "{}" guards against a None content field
result = json.loads(raw) # parse the JSON string into a Python dict — the prompt guarantees it is valid JSON
# Token counts are passed back so the orchestrator can aggregate cost across both agents
result["prompt_tokens"] = response.usage.prompt_tokens # number of tokens in the system + user messages
result["completion_tokens"] = response.usage.completion_tokens # number of tokens in the model's response
except Exception as exc:
result = {"error": str(exc), "prompt_tokens": 0, "completion_tokens": 0} # on any failure, return a safe error dict with zero token counts
await self._complete(event_queue, context, result) # enqueue the result as a completed task event — the server sends this back to the orchestrator
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
context_id=context.context_id or str(uuid4()), # context_id is required — fall back to a new UUID if the context has none
task_id=context.task_id or str(uuid4()), # task_id is required — fall back to a new UUID if the task has none
status=TaskStatus(state=TaskState.canceled), # signals the task was cancelled rather than completed
final=True, # no more events will follow for this task
)
)
async def _complete(self, event_queue: EventQueue, context: RequestContext, result: dict) -> None:
"""Wrap result in an A2A completed-task event and enqueue it."""
msg = Message(
message_id=str(uuid4()), # required by the A2A spec — every message must have a unique sender-generated ID
role=Role.agent, # Role.agent marks this as the agent's reply — the orchestrator looks for agent-role messages
parts=[Part(root=TextPart(text=json.dumps(result)))], # serialise the result dict to a JSON string — the orchestrator will parse it back out
)
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
context_id=context.context_id or str(uuid4()), # required field — identifies the conversation context this task belongs to
task_id=context.task_id or str(uuid4()), # required field — identifies the specific task being completed
status=TaskStatus(state=TaskState.completed, message=msg), # completed state + the message carrying the result JSON
final=True, # marks this as the last event — tells the server the task is done and the response can be sent
)
)
Three things worth noting here:
context.get_user_input() — the A2A SDK provides this utility to extract all text from the incoming message’s parts. This is preferred over manually walking context.message.parts.
context_id and task_id on TaskStatusUpdateEvent — both are required fields in this SDK version. Omitting either causes a Pydantic ValidationError that the server swallows silently, returning an empty response to the orchestrator.
Token counts in the result dict — the executor appends prompt_tokens and completion_tokens to the JSON result so the orchestrator can aggregate them across both agents for the cost display.
Server — classifier_agent/server.py
The server wraps the executor in an A2A-compliant HTTP application. It defines an agent card that describes the agent’s capabilities — this card is served at /.well-known/agent-card.json and is how the orchestrator discovers the agent.
import os # reads CLASSIFIER_AGENT_PORT from the environment after load_dotenv()
import uvicorn # ASGI server — runs the Starlette app built by A2AStarletteApplication
from dotenv import load_dotenv # reads .env so os.getenv can access CLASSIFIER_AGENT_PORT
from a2a.types import AgentCard, AgentCapabilities, AgentSkill # A2A types that describe the agent's identity and capabilities
from a2a.server.apps import A2AStarletteApplication # builds the ASGI app — handles A2A routing, agent card serving, and message dispatch
from a2a.server.request_handlers import DefaultRequestHandler # wires the executor and task store together — handles the full request lifecycle
from a2a.server.tasks import InMemoryTaskStore # stores task state in memory — sufficient for dev since tasks are short-lived
from classifier_agent.executor import ClassifierExecutor # the executor class that does the actual classification work
load_dotenv() # loads .env before reading the port — must be called before os.getenv
_PORT = int(os.getenv("CLASSIFIER_AGENT_PORT", "8001")) # port read from .env — defaults to 8001; int() converts the string env value to an integer
# Describes this agent's capability — read by the orchestrator via agent card discovery
classify_skill = AgentSkill(
id="email_classification", # unique identifier for this skill — used in agent card discovery
name="Email Intent and Tone Classifier", # human-readable name — shown in agent discovery tools
description="Classifies a draft email's intent, tone, recipient type, and lists specific issues", # what the skill does — helps orchestrators choose the right agent
tags=["email", "classification", "tone", "intent"], # searchable tags — help orchestrators find agents by capability keyword
examples=[
"hey can u send me that file asap??", # example of too-casual tone — illustrates the kind of input this agent handles
"i am writing to you because your service was absolutely terrible", # example of aggressive tone
],
inputModes=["text/plain"], # this agent accepts plain text — the orchestrator sends raw email drafts
outputModes=["application/json"], # this agent returns structured JSON — intent, tone, recipient_type, issues
)
# Agent card is served at /.well-known/agent-card.json for A2A discovery
classifier_card = AgentCard(
name="Email Classifier Agent", # display name — shown in A2A discovery tools and dashboards
description="Specialised A2A agent that classifies email drafts by intent, tone, and recipient type",
url=f"http://localhost:{_PORT}/", # base URL where this agent is reachable — the orchestrator uses this to send messages
version="1.0.0", # semantic version — increment when the agent's interface changes
capabilities=AgentCapabilities(streaming=False), # streaming=False — this agent returns a single complete response, not a stream
defaultInputModes=["text/plain"], # default input format — plain text email drafts
defaultOutputModes=["application/json"], # default output format — structured JSON classification result
skills=[classify_skill], # list of skills this agent can perform — orchestrators read these during discovery
)
_cls_handler = DefaultRequestHandler(
agent_executor=ClassifierExecutor(), # the executor instance that handles each incoming A2A request
task_store=InMemoryTaskStore(), # stores task state in memory — fine for dev since tasks complete quickly
)
app = A2AStarletteApplication(
agent_card=classifier_card, # the agent card is served at /.well-known/agent-card.json — this is how the orchestrator discovers this agent
http_handler=_cls_handler, # the request handler that routes incoming messages to the executor
)
if __name__ == "__main__":
uvicorn.run(app.build(), host="0.0.0.0", port=_PORT, log_level="warning") # app.build() returns the ASGI callable; 0.0.0.0 binds to all interfaces; warning suppresses access logs
DefaultRequestHandler wires together the executor and the task store — it handles incoming A2A requests, routes them to ClassifierExecutor.execute(), and manages task lifecycle. InMemoryTaskStore stores task state in memory, which is fine for development since tasks are short-lived.
Building the Rewriter Agent
The rewriter agent receives two pieces of input from the orchestrator: the original email and the classification produced by the classifier. It combines them into a structured rewrite.
Executor — rewriter_agent/executor.py
The structure mirrors the classifier executor. The key difference is the prompt — the rewriter is told not to use generic openers and to fix the specific issues identified in the classification.
# Receives original email + classification context and produces a professional rewrite
_REWRITE_PROMPT = (
"You are a professional email writing assistant. "
"You will receive an original draft email along with its classification (intent, tone, issues). "
"Rewrite the email professionally, fixing all identified issues while preserving the original message. "
"Do NOT use filler openers like 'I hope this message finds you well' or 'I hope you are doing well'. " # explicit prohibition — without this the model defaults to this cliché opener on almost every rewrite
"Get straight to the point after the greeting. "
"Return ONLY JSON: "
'{"subject": "suggested email subject line", '
'"body": "the professionally rewritten email body", '
'"improvements": ["list of specific improvements made to the email"]}.'
)
class RewriterExecutor(AgentExecutor):
"""A2A executor — receives original email + classification, returns a professional rewrite."""
def __init__(self):
self.llm = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY", "")) # async client — required because the executor runs inside an async event loop
self.model = os.getenv("REWRITER_MODEL", "gpt-4o-mini") # read from .env — upgrade to gpt-4o here without touching any other code
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
# The orchestrator sends the original email + classification as a single combined message
combined_input = context.get_user_input() # SDK utility — extracts and joins all text parts; returns the full combined string
if not combined_input: # guard — sends an error result rather than calling the API with an empty input
await self._complete(event_queue, context, {"error": "No input received"})
return
try:
response = await self.llm.chat.completions.create(
model=self.model, # resolved from REWRITER_MODEL in .env
messages=[
{"role": "system", "content": _REWRITE_PROMPT}, # sets the rewriter persona and enforces the JSON output format
{"role": "user", "content": combined_input}, # the combined original email + classification string from the orchestrator
],
response_format={"type": "json_object"}, # forces valid JSON output — no prose, no markdown, no explanation
max_tokens=500, # rewrites can be longer than the original — 500 allows a full professional email body
metadata=_CALL_METADATA, # attached for dashboard tracking — does not affect model output
)
raw = response.choices[0].message.content or "{}" # choices[0] is the first completion — or "{}" guards against a None content field
result = json.loads(raw) # parse the JSON string returned by the model into a Python dict
result["prompt_tokens"] = response.usage.prompt_tokens # token count for the combined input (original email + classification)
result["completion_tokens"] = response.usage.completion_tokens # token count for the rewritten email response
except Exception as exc:
result = {"error": str(exc), "prompt_tokens": 0, "completion_tokens": 0} # safe fallback — zero token counts so cost stays at $0
await self._complete(event_queue, context, result) # enqueue the result as a completed task event
The rewriter server (rewriter_agent/server.py) follows the same pattern as the classifier server — same imports, same structure, different port (8002), different skill and card definitions.
Building the Orchestrator — orchestrator.py
The orchestrator is the only component that knows both agents exist. It coordinates the two-step pipeline, constructs the combined input for the rewriter, aggregates token counts, and saves stats.
Agent Communication
import json # parses the JSON text extracted from A2A response parts
import time # measures total pipeline elapsed time from start to finish
from datetime import datetime # generates ISO 8601 timestamps for stats records
from pathlib import Path # creates the stats/ directory and constructs the output file path
from uuid import uuid4 # generates unique IDs for A2A request and message objects — required by the protocol
import httpx # async HTTP client — used to make requests to both agent servers
from a2a.client import A2AClient # A2A client — sends SendMessageRequest objects to agents and returns SendMessageResponse
from a2a.types import (
MessageSendParams, SendMessageRequest, # envelope types that wrap the outgoing message for A2A transport
Message, Role, Part, TextPart, # A2A message building blocks — TextPart holds the text inside Part inside Message
)
_PRICING = {"input": 0.150, "output": 0.600} # GPT-4o-mini rates per 1M tokens — used to calculate cost per pipeline run
AGENT_URLS = {
"classifier": "http://localhost:8001/", # URL of the classifier agent — classifies intent, tone, recipient type
"rewriter": "http://localhost:8002/", # URL of the rewriter agent — rewrites the email professionally
}
Saving Stats
STATS_DIR = Path("stats") # directory where all run records are stored
STATS_DIR.mkdir(exist_ok=True) # create stats/ if it does not exist — exist_ok=True prevents an error on subsequent runs
def save_stats(record: dict) -> None:
"""Append one pipeline record to stats/total_summary.json."""
path = STATS_DIR / "total_summary.json" # single file accumulates all pipeline runs — each run is one entry in the JSON array
history = []
if path.exists():
with open(path, encoding="utf-8") as f:
try:
history = json.load(f) # load the existing records — starts empty on the first run
except json.JSONDecodeError:
history = [] # if the file is corrupted or empty, start fresh rather than crashing
history.append(record) # add the new record to the end of the list
with open(path, "w", encoding="utf-8") as f:
json.dump(history, f, indent=2, ensure_ascii=False) # write the full list back — indent=2 keeps the file human-readable
Wrapping and Unwrapping Messages
def _make_request(text: str) -> SendMessageRequest:
"""Wrap plain text in an A2A SendMessageRequest with a unique ID."""
return SendMessageRequest(
id=str(uuid4()), # JSON-RPC request ID — unique per call; used for request/response correlation
params=MessageSendParams(
message=Message(
message_id=str(uuid4()), # required by the A2A spec — every message must carry a unique sender-generated ID
role=Role.user, # Role.user marks this as a message from the client (orchestrator) to the agent
parts=[Part(root=TextPart(text=text))], # TextPart carries the plain text payload; Part wraps it as a discriminated union type
)
),
)
def _extract_result(response) -> dict:
"""Pull the JSON result dict out of an A2A agent response."""
try:
task = response.root.result # response.root is SendMessageSuccessResponse; .result is Task | Message
if hasattr(task, "status") and task.status.message: # task.status.message is the Message the executor placed in TaskStatus
for part in task.status.message.parts: # walk the parts — there is exactly one TextPart in our executors
if hasattr(part.root, "text"): # check that this part is a TextPart (not FilePart or DataPart)
return json.loads(part.root.text) # parse the JSON string the executor serialised into the TextPart
except Exception:
pass # swallow any structural error — returns {} below so the orchestrator can handle missing data gracefully
return {}
The Combined Input
The most important design decision in the orchestrator is how it formats the rewriter’s input. Rather than sending just the classification JSON, it combines both the original email and the classification into a single readable message:
def _build_rewriter_input(raw_email: str, classification: dict) -> str:
"""
Combine the original email and its classification into a single message
for the rewriter agent so it has full context for the rewrite.
"""
issues_text = "; ".join(classification.get("issues", [])) or "none identified" # join multiple issues into one line; fall back to "none identified" if the list is empty
return (
f"Original email:\n{raw_email}\n\n" # include the full original draft so the rewriter stays faithful to the message's intent
f"Classification:\n"
f"- Intent : {classification.get('intent', 'unknown')}\n" # intent tells the rewriter what the email is trying to accomplish
f"- Tone : {classification.get('tone', 'unknown')}\n" # tone tells the rewriter what the main register problem is
f"- Recipient : {classification.get('recipient_type', 'unknown')}\n" # recipient type influences the level of formality in the rewrite
f"- Issues found : {issues_text}\n\n" # specific issues the rewriter must address — passed as plain text, not raw JSON
"Rewrite this email professionally, fixing all identified issues." # explicit closing instruction ties all the context together
)
This combined message is important for two reasons. First, the rewriter needs the original email to produce a faithful rewrite — not just a generic professional email in the same category. Second, making the classification explicit in plain text (rather than passing raw JSON) keeps the rewriter’s system prompt simple.
Sequential Execution
class EmailOrchestrator:
"""
Coordinates the two-step email pipeline using the A2A protocol.
Step 1 — Classifier Agent: reads the raw email, returns intent/tone/issues.
Step 2 — Rewriter Agent: receives original email + classification context,
returns a polished subject line, body, and improvement list.
The two steps are sequential — Step 2 depends on Step 1's output.
"""
async def process_email(self, raw_email: str) -> dict:
start_time = time.time() # capture start time before any network calls — used to compute total elapsed time at the end
async with httpx.AsyncClient() as http: # single shared httpx client for both agent calls — closed automatically when the block exits
# ── Step 1: classify ──────────────────────────────────────────────────────
classifier_client = A2AClient(http, url=AGENT_URLS["classifier"]) # A2AClient takes the httpx client + the agent's base URL — no async factory method in this SDK version
cls_response = await classifier_client.send_message(_make_request(raw_email)) # send the raw email to the classifier and wait for the Task response
classification = _extract_result(cls_response) # pull the intent/tone/issues dict from the Task.status.message.parts
# ── Step 2: rewrite (uses Step 1's output) ────────────────────────────────
rewriter_client = A2AClient(http, url=AGENT_URLS["rewriter"]) # new A2AClient pointing at the rewriter — same httpx client, different URL
rewriter_input = _build_rewriter_input(raw_email, classification) # combine original email + classification into one readable message string
rw_response = await rewriter_client.send_message(_make_request(rewriter_input)) # send the combined context to the rewriter and wait for the Task response
rewrite = _extract_result(rw_response) # pull the subject, body, improvements dict from the response
elapsed = round(time.time() - start_time, 2) # total pipeline time in seconds — includes both network round-trips
# pop() removes the token fields from the classification/rewrite dicts so they do not appear in the output display
total_prompt = classification.pop("prompt_tokens", 0) + rewrite.pop("prompt_tokens", 0) # sum prompt tokens from both agents
total_completion = classification.pop("completion_tokens", 0) + rewrite.pop("completion_tokens", 0) # sum completion tokens from both agents
input_cost = round((total_prompt / 1_000_000) * _PRICING["input"], 6) # GPT-4o-mini input rate is $0.150 per 1M tokens
output_cost = round((total_completion / 1_000_000) * _PRICING["output"], 6) # GPT-4o-mini output rate is $0.600 per 1M tokens
stats_record = {
"timestamp": datetime.now().isoformat(), # ISO 8601 timestamp — used to order records in total_summary.json
"raw_email": raw_email, # the original input — stored so each record is self-contained
"classification": classification, # intent, tone, recipient_type, issues (token fields already removed by pop)
"rewrite": rewrite, # subject, body, improvements (token fields already removed by pop)
"prompt_tokens": total_prompt, # combined prompt tokens from both agents
"completion_tokens": total_completion, # combined completion tokens from both agents
"total_tokens": total_prompt + total_completion, # grand total for the run
"input_cost": input_cost, # cost of the input tokens in USD
"output_cost": output_cost, # cost of the output tokens in USD
"total_cost": round(input_cost + output_cost, 6), # combined cost rounded to 6 decimal places
"total_time_seconds": elapsed, # wall-clock time for the full pipeline run
}
save_stats(stats_record) # append this record to stats/total_summary.json — builds up a history of all pipeline runs
return {
"classification": classification, # displayed in the Classification section of the terminal output
"rewrite": rewrite, # displayed in the Rewritten Email section
"stats": stats_record, # displayed in the Stats section and also written to disk
}
Token counts are pop-ped out of the classification and rewrite dicts before returning — they are aggregated into stats_record and removed from the per-agent dicts so they do not appear in the display.
Building the Terminal App — app.py
The entry point launches both agent servers as subprocesses, waits for them to bind, then runs the input loop.
Imports and Styles
import asyncio # runs the async run_loop() from synchronous main() using asyncio.run()
import subprocess # launches both agent servers as background child processes
import sys # provides sys.executable — the same Python interpreter that is running app.py
import time # used for time.sleep(3) to wait for uvicorn to bind before the first request
from dotenv import load_dotenv # loads .env so OPENAI_API_KEY is available in the subprocesses' inherited environment
from rich.console import Console # rich console — renders coloured text, rules, and panels in the terminal
from rich.panel import Panel # renders the rewritten email body inside a bordered box
from orchestrator import EmailOrchestrator # the orchestrator class that runs the two-agent pipeline
load_dotenv() # must be called before subprocesses are launched — they inherit the already-loaded environment variables
console = Console() # single Console instance shared across all display functions
orchestrator = EmailOrchestrator() # single orchestrator instance reused for every email in the session
INTENT_STYLE = {
"request": "bold cyan", # requests are neutral — cyan is informational without urgency
"complaint": "bold red", # complaints carry urgency — red signals attention needed
"follow-up": "bold yellow", # follow-ups are moderately important — yellow is a mild alert
"introduction": "bold green", # introductions are positive — green signals a new connection
"apology": "bold magenta", # apologies are sensitive — magenta distinguishes them visually
"announcement": "bold blue", # announcements are informational — blue is calm and factual
}
TONE_STYLE = {
"too casual": "yellow", # casual tone needs attention but is not severe — yellow warns without alarming
"aggressive": "bold red", # aggressive tone is the most serious issue — bold red makes it immediately visible
"too wordy": "yellow", # wordy tone is a quality issue — same visual weight as too casual
"unprofessional": "red", # unprofessional tone is concerning — red without bold is slightly less severe than aggressive
"appropriate": "green", # appropriate tone is the ideal outcome — green confirms no issues detected
}
Launching Agent Servers
def launch_agents() -> list:
"""Start classifier and rewriter agent servers as background subprocesses."""
procs = [
subprocess.Popen(
[sys.executable, "-m", "classifier_agent.server"], # sys.executable uses the same venv Python — critical so package imports resolve correctly
stdout=subprocess.DEVNULL, # suppress uvicorn's startup output — keeps the terminal clean
stderr=subprocess.DEVNULL, # suppress server error output — agent errors surface in the orchestrator's exception messages instead
),
subprocess.Popen(
[sys.executable, "-m", "rewriter_agent.server"], # -m flag runs the module — required for correct package-relative imports
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
),
]
return procs # returned so main() can terminate both processes on exit
Main Function
def main():
console.print(" [dim]Starting agent servers...[/dim]")
procs = launch_agents() # start both agent servers as background subprocesses
time.sleep(3) # wait for uvicorn to bind to its ports — without this the first request arrives before the server is ready
try:
asyncio.run(run_loop()) # run the async input loop in the main thread — blocks until the user types exit or quit
finally:
for proc in procs:
proc.terminate() # terminate both agent subprocesses — runs even if an exception or KeyboardInterrupt occurs
console.print(" [dim]Agent servers stopped.[/dim]")
The Input Loop
async def run_loop():
print_header() # prints the pipeline title and agent port info
console.print(" Type or paste your rough email draft and press Enter.")
console.print(" Type [bold]exit[/bold] or [bold]quit[/bold] to stop.\n")
while True:
try:
text = input(" Draft: ").strip() # .strip() removes leading/trailing whitespace before processing
except (KeyboardInterrupt, EOFError):
console.print("\n Goodbye.")
break # Ctrl+C or piped input ending — exit cleanly
if not text:
continue # ignore empty input — loop back to the prompt
if text.lower() in ("exit", "quit"):
console.print(" Goodbye.")
break # user typed exit or quit — stop the loop
console.print("\n [dim]Classifying email...[/dim]") # shown immediately so the user sees activity during the ~7s pipeline run
try:
result = await orchestrator.process_email(text) # run both agents sequentially and wait for the combined result
except Exception as exc:
console.print(f" [red]Error: {exc}[/red]")
continue # on error, skip the display and loop back to the prompt
print_result(result) # render classification, rewritten email, and stats to the terminal
console.rule()
console.print()
Display
The print_result function renders classification and rewrite output with colour-coded fields. Intent is shown in uppercase, tone and recipient type in title case — values come directly from the LLM’s JSON output, so .upper() and .title() are applied at display time:
def print_result(result: dict):
classification = result.get("classification", {})
rewrite = result.get("rewrite", {})
stats = result.get("stats", {})
intent = classification.get("intent", "unknown") # one of: request, complaint, follow-up, introduction, apology, announcement
tone = classification.get("tone", "unknown") # one of: too casual, aggressive, too wordy, unprofessional, appropriate
rcpt = classification.get("recipient_type", "unknown") # one of: colleague, client, manager, stranger
issues = classification.get("issues", []) # list of specific problems found — may be empty if tone is appropriate
subject = rewrite.get("subject", "") # suggested email subject line from the rewriter
body = rewrite.get("body", "") # professionally rewritten email body
imprvmts = rewrite.get("improvements", []) # list of changes made — explains what the rewriter fixed and why
intent_style = INTENT_STYLE.get(intent, "white") # look up the rich style for this intent — fall back to white for unexpected values
tone_style = TONE_STYLE.get(tone, "white") # look up the rich style for this tone — fall back to white for unexpected values
console.print(f" Intent : [{intent_style}]{intent.upper()}[/{intent_style}]") # uppercase intent — bolder and more scannable
console.print(f" Tone : [{tone_style}]{tone.title()}[/{tone_style}]") # title case — "too casual" becomes "Too Casual"
console.print(f" Recipient type : {rcpt.title()}") # title case — "colleague" becomes "Colleague"
if issues:
console.print("\n [bold]Issues found:[/bold]")
for issue in issues:
console.print(f" [red]x[/red] {issue}") # red x bullet — visually marks each problem found
if body:
console.print(Panel(
body,
title="[bold]Professional Rewrite[/bold]",
border_style="green", # green border — signals positive output, visually distinct from the red issue markers above
padding=(1, 2), # one line of vertical padding, two chars of horizontal — keeps text away from the border
))
if imprvmts:
console.print("\n [bold]Improvements made:[/bold]")
for item in imprvmts:
console.print(f" [green]✓[/green] {item}") # green checkmark bullet — mirrors the red x above to show resolution
Running the Pipeline
python app.py
The terminal prints:
Starting agent servers...
────── Email Drafting Pipeline — A2A Multi-Agent ──────
Classifier Agent (port 8001) · Rewriter Agent (port 8002)
Type or paste your rough email draft and press Enter.
Type exit or quit to stop.
Draft:
Output and What to Expect
For a draft like "hey john, can u send me the report asap? need it for tmrw meeting. thanks", the pipeline produces:
Classification: - Intent: REQUEST - Tone: Too Casual - Recipient: Colleague - Issues: abbreviated words (u, tmrw, asap), informal greeting, no professional sign-off
Rewritten Email: - Subject: Request for Report — Tomorrow’s Meeting - Body: A concise, professional request with proper salutation and sign-off - Improvements: 5 bullet points listing exactly what was changed and why
Stats: - ~330 prompt tokens, ~235 completion tokens - Total cost ~$0.0002 per email - Total time ~7s (including server startup on first run)




total_summary.json file
[
{
"timestamp": "2026-06-11T15:31:28.810398",
"raw_email": "hey john, can u send me the report asap? need it for tmrw meeting. thanks",
"classification": {
"intent": "request",
"tone": "too casual",
"recipient_type": "colleague",
"issues": [
"Use of informal greeting ('hey')",
"Abbreviation ('u' instead of 'you')",
"Use of shorthand ('tmrw' instead of 'tomorrow')",
"Lack of formalities (no proper closing or sign-off)",
"Urgent tone could be softened for clarity"
]
},
"rewrite": {
"subject": "Request for Report",
"body": "Hi John,\n\nCould you please send me the report at your earliest convenience? I need it for the meeting tomorrow.\n\nThank you in advance.\n\nBest regards,\n[Your Name]",
"improvements": [
"Changed greeting from 'hey' to 'Hi' for formality.",
"Replaced 'u' with 'you' for professionalism.",
"Changed 'tmrw' to 'tomorrow' for clarity.",
"Added a proper closing and sign-off for a professional touch.",
"Soften the urgent tone while maintaining the request."
]
},
"prompt_tokens": 362,
"completion_tokens": 224,
"total_tokens": 586,
"input_cost": 5.4e-05,
"output_cost": 0.000134,
"total_cost": 0.000188,
"total_time_seconds": 8.2
},
....
]Who Can Benefit
Professionals who write many external emails and want a quick tone and clarity check
Non-native English speakers who want help achieving the right level of formality
Developers learning how to build multi-agent pipelines with real inter-agent HTTP communication
Teams who want to understand A2A architecture before scaling to production deployments
How Codersarts Can Help
Building production multi-agent systems requires solid understanding of agent communication protocols, async Python, API integration, and system design. If you need help building a custom A2A pipeline for your use case (email automation, document processing, customer support, or something entirely different), Codersarts offers end-to-end development and mentorship support.
Custom multi-agent system development tailored to your use case
One-on-one mentorship and code reviews
Project-based learning with real-world applications
Get in touch: codersarts.com | contact@codersarts.com




Comments