Build a Multi-Agent Product Page Copy Generator with Google ADK and OpenAI
- 1 hour ago
- 20 min read
Introduction
Writing product page copy is a task most developers outsource to a human copywriter or a single prompt. Neither approach demonstrates what a multi-agent system can do differently: break the task into focused, independent specialists, run them in parallel, and recombine their output into something no single prompt would produce as reliably.
In this tutorial we build a product page copy generator using Google’s open-source Agent Development Kit (ADK). You provide a product name, rough description, target audience, price, and brand voice. Three specialist agents, each backed by GPT-4o-mini, run simultaneously and produce a complete set of copy: a headline with tagline variations, an 80-120 word description paragraph, and five benefit-led feature bullets. A fourth orchestrating agent fires them all in parallel and hands the combined result to a Streamlit UI.

What We Are Building
A four-agent system connected via the A2A (Agent-to-Agent) protocol, each running as its own FastAPI server. The workflow:
Submit a product brief through a Streamlit form
Route it to the copy director, which fires all three specialists in parallel
Headline writer produces a main headline and three tagline variations
Description writer produces an 80-120 word paragraph
Features writer produces five benefit-led bullet points
Combine all three results and render them as a finished product page section
Tech Stack
Component | Tool |
Agent framework | Google ADK (open-source) |
Model | OpenAI gpt-4o-mini via LiteLLM adapter |
Agent communication | A2A protocol over HTTP (FastAPI + httpx) |
UI | Streamlit |
Session management | ADK InMemorySessionService |
Pricing
Google ADK itself is completely free and open-source. The only cost comes from the OpenAI model: gpt-4o-mini is priced at $0.15 per million input tokens and $0.60 per million output tokens. One complete product brief (all three specialist calls) typically uses 500-800 tokens total and costs a fraction of a cent. Every call is logged to stats.json with prompt tokens, completion tokens, generation time, and computed cost.
Project Structure
adk_product_copy_agent/
├── agents/
│ ├── copy_director/ # orchestrator: fires all three specialists in parallel
│ │ ├── task_handler.py
│ │ ├── __main__.py
│ │ └── .well-known/agent.json
│ ├── headline_writer/ # specialist 1: headline and taglines
│ │ ├── agent.py
│ │ ├── task_handler.py
│ │ ├── __main__.py
│ │ └── .well-known/agent.json
│ ├── description_writer/ # specialist 2: 80-120 word paragraph
│ └── features_writer/ # specialist 3: 5 benefit-led bullets
├── common/
│ ├── agent_client.py # async HTTP helper for director → specialist calls
│ └── agent_server.py # FastAPI factory shared by every agent
├── shared/
│ ├── models.py # ProductSpec Pydantic schema
│ └── stats_tracker.py # per-call logging to stats.json
├── copy_ui.py # Streamlit interface
├── start_agents.py # single-command launcher for all four agents
├── requirements.txt
└── .env # API key, model name, per-token rates, port numbers
Setting Up
Create a file named requirements.txt in the project root:
google-adk
litellm
fastapi
uvicorn
httpx
pydantic
openai
streamlit
python-dotenv
Create a file named .env in the project root with all configuration in one place:
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_MODEL=openai/gpt-4o-mini
GPT_4O_MINI_INPUT_COST=0.00000015
GPT_4O_MINI_OUTPUT_COST=0.00000060
PORT_COPY_DIRECTOR=9000
PORT_HEADLINE_WRITER=9001
PORT_DESCRIPTION_WRITER=9002
PORT_FEATURES_WRITER=9003
Every port number, model name, and cost rate lives in .env so nothing needs to change in code if you want to switch models or move ports.
Install dependencies and run:
python -m venv venv
venv\Scripts\activate
pip install -r requirements.txt
The Shared Schema
Before writing any agents, we define the single schema that every agent receives. This lives in shared/models.py and is imported by the Streamlit UI to validate form data before sending.
Create a file named shared/models.py:
from pydantic import BaseModel # validated input schema shared by every agent in the pipeline
class ProductSpec(BaseModel): # one instance is built from the Streamlit form and sent to every agent
product_name: str # the name of the product, e.g. "Apple AirPods Pro"
rough_description: str # the user's own words describing what the product does and its key specs
target_audience: str # who the product is for, e.g. "iPhone users who want premium audio on the go"
price_usd: float # retail price in USD, used to calibrate the tone of value messaging in the copy
brand_voice: str # e.g. "premium and aspirational", "friendly and approachable", "bold and energetic"
ProductSpec is a Pydantic model, so any missing or wrongly typed field in the form will be rejected before it reaches any agent. All five fields are passed verbatim to each specialist’s system prompt.
The Shared Infrastructure
Two small files in common/ are shared by every agent. One handles the HTTP call from the director to a specialist; the other wraps a FastAPI app around any handler.
Create a file named common/agent_client.py:
import httpx # async HTTP client used by the director to call each specialist agent
async def dispatch_to_agent(url: str, payload: dict) -> dict:
async with httpx.AsyncClient() as client: # fresh client per call, avoids stale-connection issues
response = await client.post(url, json=payload, timeout=60.0) # 60s headroom for LLM inference latency
response.raise_for_status() # raise immediately on any 4xx/5xx from the specialist
return response.json() # the specialist's fully-written copy section as a dictdispatch_to_agent is a plain async function. The director calls it three times concurrently via asyncio.gather, which is what makes the parallel execution possible.
Create a file named common/agent_server.py:
from fastapi import FastAPI # each specialist and the director runs as its own FastAPI service
def create_agent_service(handler) -> FastAPI:
app = FastAPI() # one independent FastAPI instance per agent process, no shared state
@app.post("/run") # the A2A protocol's standard endpoint every agent in the network exposes
async def run(payload: dict): # receives the ProductSpec dict from the director or from the Streamlit UI
return await handler.generate(payload) # delegate to whichever agent-specific handler was injected
return app # caller binds this app to a port via uvicorn.run(app, port=...)
Every agent exposes exactly one endpoint: POST /run. This is the A2A protocol’s convention: a fixed, predictable URL on each service that any other agent can call to get a result.
Stats Tracking
We log every model call to stats.json in the project root. The file accumulates across all requests and all agents, never overwriting what is already there.
Create a file named shared/stats_tracker.py:
import json # read and write the accumulated stats.json file
import os # read per-token cost rates from the environment
from datetime import datetime # timestamp every call record written to stats.json
from pathlib import Path # resolve the project root regardless of the working directory
from dotenv import load_dotenv # load .env before reading cost rates from os.environ
import litellm # set global metadata that gets forwarded to every OpenAI API call
load_dotenv() # must run before any os.environ.get() call below
PROJECT_ROOT = Path(__file__).resolve().parent.parent # two levels up from shared/ gives the project root
STATS_FILE = PROJECT_ROOT / "stats.json" # the single accumulating log file for all agents
# Per-token USD rates loaded from .env so pricing can be updated without changing code
_INPUT_COST = float(os.environ.get("GPT_4O_MINI_INPUT_COST", 0.00000015)) # $0.15 per million input tokens
_OUTPUT_COST = float(os.environ.get("GPT_4O_MINI_OUTPUT_COST", 0.00000060)) # $0.60 per million output tokens
COST_RATES = {
"gpt-4o-mini": {"input": _INPUT_COST, "output": _OUTPUT_COST}, # plain model name key
"openai/gpt-4o-mini": {"input": _INPUT_COST, "output": _OUTPUT_COST}, # provider-prefixed key from litellm
}
OPENAI_METADATA = { # forwarded to OpenAI's API, visible in the OpenAI usage dashboard
"dev_name": "Ganesh",
"project": "codex-test",
"environment": "local",
"purpose": "testing",
}
def configure_litellm() -> None:
litellm.metadata = OPENAI_METADATA # attaches to every LiteLLM call made by this process
def log_call(
agent_name: str, # which specialist produced this record, e.g. "headline_writer"
model: str, # the model id used, e.g. "openai/gpt-4o-mini"
prompt: str, # the full prompt sent to the model for this call
response: str, # the raw text the model returned
generation_seconds: float, # wall-clock time measured around runner.run_async()
prompt_tokens: int = 0, # input token count from ADK's event.usage_metadata, 0 if unavailable
completion_tokens: int = 0, # output token count from ADK's event.usage_metadata, 0 if unavailable
) -> None:
model_key = model.replace("openai/", "") # normalise to bare model name for lookup
rates = COST_RATES.get(model_key, COST_RATES.get("gpt-4o-mini", {"input": 0, "output": 0})) # fall back to gpt-4o-mini rates
total_tokens = prompt_tokens + completion_tokens # sum of input and output tokens
input_cost = round(prompt_tokens * rates["input"], 7) # USD cost of the prompt tokens
output_cost = round(completion_tokens * rates["output"], 7) # USD cost of the completion tokens
record = {
"timestamp": datetime.now().isoformat(), # when this specific call was logged
"agent": agent_name, # which agent produced this output
"model": model, # which model was used
"generation_seconds": round(generation_seconds, 3), # how long the API call took in seconds
"prompt": prompt[:2000], # first 2000 chars to keep stats.json readable
"response": response[:2000], # first 2000 chars of the generated copy
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": round(input_cost + output_cost, 7),
}
try:
existing = json.loads(STATS_FILE.read_text(encoding="utf-8")) if STATS_FILE.exists() else {"summary": {}, "calls": []}
except (json.JSONDecodeError, OSError):
existing = {"summary": {}, "calls": []} # start fresh if the file is missing or corrupt
existing["calls"].append(record) # accumulate: never overwrite, always append
calls = existing["calls"] # reference to the full history for computing totals
existing["summary"] = {
"timestamp": datetime.now().isoformat(), # when stats.json was last written
"total_calls": len(calls), # total number of agent calls ever recorded
"total_generation_seconds": round(sum(c.get("generation_seconds", 0) for c in calls), 3), # cumulative API time
"total_prompt_tokens": sum(c["prompt_tokens"] for c in calls),
"total_completion_tokens": sum(c["completion_tokens"] for c in calls),
"total_tokens": sum(c["total_tokens"] for c in calls),
"total_input_cost": round(sum(c.get("input_cost", 0) for c in calls), 6),
"total_output_cost": round(sum(c.get("output_cost", 0) for c in calls), 6),
"total_cost": round(sum(c["total_cost"] for c in calls), 6), # lifetime cost across all agents and runs
}
STATS_FILE.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8") # atomic overwrite
print(f"[stats] Logged {agent_name}: {total_tokens} tokens, ${record['total_cost']:.7f}, {round(generation_seconds,3)}s")
log_call is called directly inside each agent’s generate() function right after the ADK runner finishes, so timing and output are captured reliably without depending on any callback system. The summary block is recomputed from the full call history on every write, so it always reflects the true lifetime totals.
Building the Specialist Agents
All three specialists follow the same pattern: configure LiteLLM metadata, define the ADK Agent with a focused instruction, create a Runner, and expose a generate() function that creates a unique session per request, runs the model, logs the result, and returns structured JSON.
Headline Writer
This agent produces a main headline and three tagline variations. Create a file named agents/headline_writer/agent.py:
import os # read the model name and other config from the environment
import json # parse the JSON the model returns as its structured output
import time # measure wall-clock time around the ADK runner call
import uuid # generate a unique session id for every request
import sys
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent)) # make shared/ importable
from google.adk.agents import Agent # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService # holds per-request session state in memory
from google.genai import types # Content and Part types for constructing the user message
from dotenv import load_dotenv # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call # metadata setup and per-call logging
load_dotenv() # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm() # attaches OPENAI_METADATA to every LiteLLM call this process makes
MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini") # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_headline" # fixed user id for this agent's in-memory session
headline_agent = Agent(
name="headline_writer", # agent's internal identifier, must be unique within the app
model=LiteLlm(MODEL), # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
description="Writes a punchy product headline and tagline variations for a product page.",
instruction=(
"You are an expert product copywriter. Given a product name, description, target audience, "
"price, and brand voice, write: one main headline (max 10 words, attention-grabbing, benefit-led), "
"and three tagline variations (max 12 words each, different angles: aspirational, practical, witty). "
"Respond ONLY with valid JSON with the key 'headlines' containing: "
"'main_headline' (string) and 'taglines' (list of 3 strings)."
),
)
session_service = InMemorySessionService() # holds session state in memory; no persistence needed here
runner = Runner(agent=headline_agent, app_name="headline_app", session_service=session_service) # drives the conversation
async def generate(request: dict) -> dict:
session_id = uuid.uuid4().hex # unique per request so ADK never sees a duplicate session id
await session_service.create_session(app_name="headline_app", user_id=WRITER_ID, session_id=session_id) # register it
prompt = ( # build the prompt from the ProductSpec fields sent by the director
f"Product: {request['product_name']}. "
f"What it does: {request['rough_description']}. "
f"Target audience: {request['target_audience']}. "
f"Price: ${request.get('price_usd', 0)}. "
f"Brand voice: {request.get('brand_voice', 'professional')}. "
"Write a headline and three tagline variations."
)
message = types.Content(role="user", parts=[types.Part(text=prompt)]) # ADK requires this typed wrapper
start = time.monotonic() # start timing before the model call begins
response_text = "" # will hold the model's raw JSON string once it arrives
prompt_tokens = 0 # default to 0; updated if ADK exposes usage_metadata
completion_tokens = 0 # same; updated if the event carries token count fields
async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
if event.is_final_response(): # skip intermediate streaming events, only process the last one
response_text = event.content.parts[0].text # extract the raw text from the ADK content object
usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None) # try both field names
if usage:
prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
elapsed = time.monotonic() - start # total wall-clock seconds the model took to respond
log_call("headline_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens) # write to stats.json
try:
parsed = json.loads(response_text) # the model should have returned valid JSON
return parsed.get("headlines", {"main_headline": response_text, "taglines": []}) # extract the nested dict
except json.JSONDecodeError:
return {"main_headline": response_text, "taglines": []} # fallback: return raw text if JSON parsing fails
The instruction tells the model exactly what JSON shape to return. The uuid.uuid4().hex session ID ensures that submitting a second product never crashes with an AlreadyExistsError from the InMemorySessionService, which raises that error if you try to create a session that already exists.
Create a file named agents/headline_writer/task_handler.py:
from .agent import generate # import the async generate function from this agent's agent.py
class HeadlineHandler:
async def generate(self, payload: dict) -> dict:
return await generate(payload) # delegate to the module-level generate function; keeps the handler thin
Create a file named agents/headline_writer/__main__.py:
import os # read the port number from the environment
from dotenv import load_dotenv # load .env so PORT_HEADLINE_WRITER is available
from common.agent_server import create_agent_service # shared FastAPI factory used by every agent
from .task_handler import HeadlineHandler # this agent's handler class
load_dotenv() # must run before os.environ.get() reads the port
app = create_agent_service(HeadlineHandler()) # wire the handler into the /run endpoint
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=int(os.environ.get("PORT_HEADLINE_WRITER", 8001))) # bind to the port from .env
Create a file named agents/headline_writer/.well-known/agent.json:
{"name": "headline_writer", "description": "Writes a product headline and three tagline variations."}
Description Writer and Features Writer
The description and features agents follow exactly the same structure as the headline writer. Only the agent name, instruction text, expected JSON output key, and port differ.
Create a file named agents/description_writer/agent.py:
import os # read the model name and other config from the environment
import json # parse the JSON the model returns as its structured output
import time # measure wall-clock time around the ADK runner call
import uuid # generate a unique session id for every request
import sys # modify the module search path so shared/ is importable
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent)) # add project root to sys.path
from google.adk.agents import Agent # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService # holds per-request session state in memory
from google.genai import types # Content and Part types for constructing the user message
from dotenv import load_dotenv # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call # metadata setup and per-call logging
load_dotenv() # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm() # attaches OPENAI_METADATA to every LiteLLM call this process makes
MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini") # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_description" # fixed user id for this agent's in-memory session
description_agent = Agent(
name="description_writer", # agent's internal identifier, must be unique within the app
model=LiteLlm(MODEL), # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
description="Writes a compelling product description paragraph for a product page.",
instruction=(
"You are an expert product copywriter. Given a product name, description, target audience, "
"price, and brand voice, write a compelling product description paragraph of 80-120 words. "
"Lead with the customer's problem or desire, introduce the product as the solution, "
"highlight the core benefit, and close with a sense of confidence or assurance. "
"Respond ONLY with valid JSON with the key 'description' containing a single string."
),
)
session_service = InMemorySessionService() # holds session state in memory; no persistence needed here
runner = Runner(agent=description_agent, app_name="description_app", session_service=session_service) # drives the conversation
async def generate(request: dict) -> dict:
session_id = uuid.uuid4().hex # fresh unique id per request, never reused across calls
await session_service.create_session(app_name="description_app", user_id=WRITER_ID, session_id=session_id) # register the session
prompt = ( # build the prompt from the ProductSpec fields sent by the director
f"Product: {request['product_name']}. "
f"What it does: {request['rough_description']}. "
f"Target audience: {request['target_audience']}. "
f"Price: ${request.get('price_usd', 0)}. "
f"Brand voice: {request.get('brand_voice', 'professional')}. "
"Write an 80-120 word product description paragraph."
)
message = types.Content(role="user", parts=[types.Part(text=prompt)]) # ADK requires this typed wrapper
start = time.monotonic() # start timing before the model call begins
response_text = "" # will hold the model's raw JSON string once it arrives
prompt_tokens = 0 # default to 0; updated if ADK exposes usage_metadata
completion_tokens = 0 # same; updated if the event carries token count fields
async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
if event.is_final_response(): # ignore streaming intermediate events, only process the last one
response_text = event.content.parts[0].text # extract the raw text from the ADK content object
usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None) # try both field names
if usage:
prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
elapsed = time.monotonic() - start # total wall-clock seconds the model took to respond
log_call("description_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens) # write to stats.json
try:
parsed = json.loads(response_text) # the model should have returned valid JSON
return {"description": parsed.get("description", response_text)} # extract the string inside the JSON key
except json.JSONDecodeError:
return {"description": response_text} # fallback: return raw text if the model skipped JSON wrapping
Create a file named agents/features_writer/agent.py:
import os # read the model name and other config from the environment
import json # parse the JSON the model returns as its structured output
import time # measure wall-clock time around the ADK runner call
import uuid # generate a unique session id for every request
import sys # modify the module search path so shared/ is importable
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent)) # add project root to sys.path
from google.adk.agents import Agent # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService # holds per-request session state in memory
from google.genai import types # Content and Part types for constructing the user message
from dotenv import load_dotenv # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call # metadata setup and per-call logging
load_dotenv() # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm() # attaches OPENAI_METADATA to every LiteLLM call this process makes
MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini") # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_features" # fixed user id for this agent's in-memory session
features_agent = Agent(
name="features_writer", # agent's internal identifier, must be unique within the app
model=LiteLlm(MODEL), # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
description="Writes a features and benefits bullet list for a product page.",
instruction=(
"You are an expert product copywriter. Given a product name, description, target audience, "
"price, and brand voice, write 5 feature/benefit bullets. "
"Each bullet should lead with the BENEFIT (what the customer gains) then briefly state the feature "
"that delivers it. Format: 'Benefit — feature detail'. Keep each bullet under 15 words. "
"Respond ONLY with valid JSON with the key 'features' containing a list of 5 strings."
),
)
session_service = InMemorySessionService() # holds session state in memory; no persistence needed here
runner = Runner(agent=features_agent, app_name="features_app", session_service=session_service) # drives the conversation
async def generate(request: dict) -> dict:
session_id = uuid.uuid4().hex # unique id prevents AlreadyExistsError on repeated requests
await session_service.create_session(app_name="features_app", user_id=WRITER_ID, session_id=session_id) # register the session
prompt = ( # build the prompt from the ProductSpec fields sent by the director
f"Product: {request['product_name']}. "
f"What it does: {request['rough_description']}. "
f"Target audience: {request['target_audience']}. "
f"Price: ${request.get('price_usd', 0)}. "
f"Brand voice: {request.get('brand_voice', 'professional')}. "
"Write 5 benefit-led feature bullets."
)
message = types.Content(role="user", parts=[types.Part(text=prompt)]) # ADK requires this typed wrapper
start = time.monotonic() # start timing before the model call begins
response_text = "" # will hold the model's raw JSON string once it arrives
prompt_tokens = 0 # default to 0; updated if ADK exposes usage_metadata
completion_tokens = 0 # same; updated if the event carries token count fields
async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
if event.is_final_response(): # only the final event carries the complete generated text
response_text = event.content.parts[0].text # extract the raw text from the ADK content object
usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None) # try both field names
if usage:
prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
elapsed = time.monotonic() - start # total wall-clock seconds the model took to respond
log_call("features_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens) # write to stats.json
try:
parsed = json.loads(response_text) # the model should have returned valid JSON
return {"features": parsed.get("features", [])} # the list of 5 benefit-led bullet strings
except json.JSONDecodeError:
return {"features": response_text} # fallback if the model returned plain text instead of JSON
Each specialist also needs its own task_handler.py and main.py. These are structurally identical across all three agents, with only the handler class name, handler import, and port variable differing.
Create agents/description_writer/task_handler.py,
agents/description_writer/__main__.py, and the same two files for features_writer, following the exact same pattern as headline_writer above, substituting DescriptionHandler/FeaturesHandler and PORT_DESCRIPTION_WRITER/PORT_FEATURES_WRITER accordingly.
Building the Copy Director
The director is the only agent that does not call a language model directly. Its only job is to fire all three specialists in parallel and combine their results.
Create a file named agents/copy_director/task_handler.py:
import asyncio # run the three specialist calls concurrently via asyncio.gather
import os # read specialist port numbers from the environment
from dotenv import load_dotenv # load .env so the PORT_* variables are available
from common.agent_client import dispatch_to_agent # shared async HTTP helper for calling each specialist
load_dotenv() # must run before os.environ.get() reads the port variables
# Build each specialist's URL from .env so ports can be changed without touching code
HEADLINE_URL = f"http://127.0.0.1:{os.environ.get('PORT_HEADLINE_WRITER', 8001)}/run"
DESCRIPTION_URL = f"http://127.0.0.1:{os.environ.get('PORT_DESCRIPTION_WRITER', 8002)}/run"
FEATURES_URL = f"http://127.0.0.1:{os.environ.get('PORT_FEATURES_WRITER', 8003)}/run"
class DirectorHandler:
async def generate(self, payload: dict) -> dict:
# All three specialists are independent, run them in parallel rather than sequentially
# so the total wait time equals the slowest single agent, not the sum of all three.
headline_task = dispatch_to_agent(HEADLINE_URL, payload) # fire headline_writer
description_task = dispatch_to_agent(DESCRIPTION_URL, payload) # fire description_writer simultaneously
features_task = dispatch_to_agent(FEATURES_URL, payload) # fire features_writer simultaneously
headlines, description, features = await asyncio.gather( # wait for all three to finish
headline_task, description_task, features_task
)
headlines = headlines if isinstance(headlines, dict) else {} # guard against unexpected response shapes
description = description if isinstance(description, dict) else {}
features = features if isinstance(features, dict) else {}
return { # combined result sent back to the Streamlit UI
"headlines": headlines,
"description": description.get("description", ""), # unwrap the nested "description" string key
"features": features.get("features", []), # unwrap the nested "features" list key
}
asyncio.gather is the key line: it starts all three HTTP calls simultaneously and waits for all three to complete. Without it, the total time would be the sum of three separate model calls. With it, the total time is roughly equal to whichever single specialist takes longest.
Create a file named agents/copy_director/__main__.py:
import os # read PORT_COPY_DIRECTOR from the environment
from dotenv import load_dotenv # load .env before reading the port
from common.agent_server import create_agent_service # shared FastAPI factory
from .task_handler import DirectorHandler # the director's handler that orchestrates all three specialists
load_dotenv() # must run before os.environ.get() below
app = create_agent_service(DirectorHandler()) # wire the director handler into the /run endpoint
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=int(os.environ.get("PORT_COPY_DIRECTOR", 9000))) # port 9000 by default
The Streamlit UI
The UI form collects the five fields that match ProductSpec, calls the director’s /run endpoint, and renders the combined result as a formatted product page section.
Create a file named copy_ui.py in the project root:
import os # read PORT_COPY_DIRECTOR from the environment
import requests # synchronous HTTP to call the director's /run endpoint
import streamlit as st # the entire visual layer: form, layout, and result rendering
from dotenv import load_dotenv # load .env so PORT_COPY_DIRECTOR is available
load_dotenv() # must run before os.environ.get() reads the port below
st.set_page_config(page_title="Product Copy Generator", page_icon="🛒", layout="wide")
st.title("🛒 Product Page Copy Generator") # page title shown at the top of the browser tab and the app
with st.form("product_form"): # group all inputs into one form so submit fires a single event
product_name = st.text_input("Product name", placeholder="e.g. Apple AirPods Pro")
rough_description = st.text_area(
"Describe the product in your own words",
placeholder="e.g. An electric height-adjustable standing desk with memory presets and a quiet motor.",
height=100,
)
col1, col2 = st.columns(2) # split the remaining inputs into two equal columns
with col1:
target_audience = st.text_input("Target audience", placeholder="e.g. remote workers aged 25-45")
brand_voice = st.selectbox( # constrained dropdown prevents free-text typos in the tone field
"Brand voice",
["premium and aspirational", "friendly and approachable", "technical and precise", "bold and energetic"],
)
with col2:
price_usd = st.number_input("Price (USD)", min_value=1.0, value=299.0, step=10.0)
submitted = st.form_submit_button("Generate Copy", type="primary", use_container_width=True) # full-width primary button
if submitted: # everything below only runs when the user clicks the button
if not product_name or not rough_description or not target_audience:
st.info("Please fill in product name, description, and target audience.")
st.stop() # halt rendering; don't make the API call with missing fields
payload = { # build the dict that matches the ProductSpec Pydantic schema
"product_name": product_name,
"rough_description": rough_description,
"target_audience": target_audience,
"price_usd": price_usd,
"brand_voice": brand_voice,
}
with st.spinner("Writing headlines, description, and feature bullets in parallel..."):
try:
director_port = int(os.environ.get("PORT_COPY_DIRECTOR", 9000)) # read from .env, fall back to 9000
response = requests.post(f"http://127.0.0.1:{director_port}/run", json=payload, timeout=90) # 90s for 3 parallel LLM calls
response.raise_for_status() # raise on 4xx/5xx from the director
data = response.json() # the combined result dict: headlines, description, features
except requests.exceptions.RequestException as e:
st.warning(f"Could not reach the copy director. Make sure all agents are running. ({e})")
st.stop()
headlines = data.get("headlines", {}) # dict with "main_headline" and "taglines"
description = data.get("description", "") # the 80-120 word paragraph string
features = data.get("features", []) # list of 5 benefit-led bullet strings
st.success("Copy ready!") # green banner confirms the run completed
st.divider() # visual separator between the form and the copy output
st.subheader(headlines.get("main_headline", "")) # render the main headline as a subheading
if headlines.get("taglines"):
cols = st.columns(len(headlines["taglines"])) # one column per tagline for a clean side-by-side layout
for col, tagline in zip(cols, headlines["taglines"]):
col.caption(f"*{tagline}*") # italicised caption for each tagline variation
st.divider()
st.markdown(description) # render the description paragraph as markdown
st.divider()
st.subheader("Features & Benefits")
if isinstance(features, list):
for bullet in features:
st.markdown(f"✓ {bullet}") # checkmark prefix makes each bullet visually scannable
else:
st.markdown(str(features)) # fallback if the model returned a raw string instead of a list
The form is wrapped in st.form() so that filling in multiple fields and then clicking “Generate Copy” fires exactly one request, not one per field change. The timeout=90 on the HTTP call is intentionally generous: the director waits for all three parallel model calls to finish before responding, and the slowest one can take 20-30 seconds.
The Launcher
Rather than opening four separate terminals, a single script starts all four agents and watches for unexpected exits.
Create a file named start_agents.py in the project root:
import os # read agent ports from the environment
import subprocess # launch each agent as a child process
import sys # access the current Python interpreter path
import time # sleep between process health checks
import signal # catch Ctrl+C and SIGTERM for clean shutdown
from dotenv import load_dotenv # load .env so PORT_* variables are available
load_dotenv() # must run before os.environ.get() reads the port values
# One entry per agent: (module name under agents/, port number from .env)
AGENTS = [
("copy_director", int(os.environ.get("PORT_COPY_DIRECTOR", 9000))), # orchestrator
("headline_writer", int(os.environ.get("PORT_HEADLINE_WRITER", 9001))), # specialist 1
("description_writer", int(os.environ.get("PORT_DESCRIPTION_WRITER",9002))), # specialist 2
("features_writer", int(os.environ.get("PORT_FEATURES_WRITER", 9003))), # specialist 3
]
processes = [] # keep references so shutdown() can terminate every one
def launch():
print("Starting all agents...\n")
for module, port in AGENTS:
proc = subprocess.Popen(
[sys.executable, "-m", f"agents.{module}"], # same Python that runs this script
cwd=os.path.dirname(os.path.abspath(__file__)), # run from the project root so imports resolve
)
processes.append(proc) # track for later shutdown
print(f" Started agents.{module} on port {port} (pid {proc.pid})")
print(f"\nAll {len(AGENTS)} agents running.")
print("Press Ctrl+C here to stop all agents.\n")
def shutdown(sig=None, frame=None):
print("\nShutting down all agents...")
for proc in processes:
proc.terminate() # send SIGTERM to each child process
for proc in processes:
proc.wait() # block until every child has fully exited
print("All agents stopped.")
sys.exit(0)
signal.signal(signal.SIGINT, shutdown) # Ctrl+C triggers clean shutdown
signal.signal(signal.SIGTERM, shutdown) # SIGTERM is the "please stop cleanly" signal sent by task managers and cloud platforms
launch() # start all four processes
while True: # keep the parent alive and watch for unexpected child exits
time.sleep(1)
for proc in processes:
if proc.poll() is not None: # poll() returns None while the process is running
print(f"Agent (pid {proc.pid}) exited unexpectedly. Shutting down all.")
shutdown()
subprocess.Popen with -m agents.module runs each agent the same way Python’s module runner does, which means relative imports inside each main.py resolve correctly. The while True loop at the bottom keeps the launcher alive and restarts the whole group if any one process exits unexpectedly.
Running the Application
Start all four agents in one terminal:
python start_agents.py
Then in a second terminal launch the Streamlit UI:
streamlit run copy_ui.py
Open http://localhost:8501 and fill in the product details. Results appear once all three specialists have finished. Whichever specialist takes longest determines the total wait, typically 10-25 seconds per product depending on network and model load.
stats.json in the project root accumulates a record for every model call, with prompt text, response text, token counts, cost, and timing per agent. The summary block at the top of the file reflects lifetime totals across every product and every agent since the file was created.
Who Can Benefit
Developers — Get a working example of a real multi-agent ADK system using the A2A protocol, parallel execution, and LiteLLM model routing.
Product teams — Use this as a starting point for any copywriting or content generation workflow that benefits from specialised agents working in parallel.
Students learning AI agent architecture — See how asyncio.gather, FastAPI services, and Google ADK combine into a production-style multi-agent pipeline.
E-commerce and SaaS teams — Adapt the specialist agents to produce SEO meta descriptions, email subject lines, or ad copy without changing the underlying architecture.
Engineers evaluating ADK — Get real cost, timing, and output data from a working deployment before committing the framework to a larger project.
How Codersarts Can Help
If you want to take this further, Codersarts offers hands-on support at every stage.
For learners: Live 1-to-1 sessions with an AI engineer who can walk through Google ADK’s agent architecture, the A2A communication protocol, and multi-agent parallel execution in detail.
For teams: End-to-end development of multi-agent content generation pipelines, including model selection, prompt engineering, cost optimisation, and reliability testing.
For enterprises: Architecture consulting for production agent deployments, including scaling individual agents independently, adding authentication, and integrating with existing content management systems.
Reach out at contact@codersarts.com or visit www.codersarts.com to get started.
Continue Your AI Learning Journey with Codersarts
If you enjoyed this article and would like to discover more about modern AI applications, production-ready LLM systems, and real-world RAG and MCP implementations, be sure to explore these other blogs from Codersarts:
Build a Cost-Efficient Writing Quality Checker with Tiered Model Routing and OpenAI
Build Your First A2A Agent: An Email Drafting Pipeline Using Python and OpenAI
Building an AI Interview Prep Agent with Qwen 3.7 Max and Streamlit
https://www.codersarts.com/post/building-an-ai-interview-prep-agent-with-qwen-3-7-max-and-streamlit
Academic Research Assistance and Literature Review Automation Using RAG
Clinical Decision Support Systems Using RAG: Intelligent Diagnostic Assistance for Healthcare
Financial Decision Making with RAG Powered Market Intelligence
https://www.codersarts.com/post/financial-decision-making-with-rag-powered-market-intelligence




Comments