top of page

Build a Multi-Agent Product Page Copy Generator with Google ADK and OpenAI

  • 1 hour ago
  • 20 min read


Introduction


Writing product page copy is a task most developers outsource to a human copywriter or a single prompt. Neither approach demonstrates what a multi-agent system can do differently: break the task into focused, independent specialists, run them in parallel, and recombine their output into something no single prompt would produce as reliably.


In this tutorial we build a product page copy generator using Google’s open-source Agent Development Kit (ADK). You provide a product name, rough description, target audience, price, and brand voice. Three specialist agents, each backed by GPT-4o-mini, run simultaneously and produce a complete set of copy: a headline with tagline variations, an 80-120 word description paragraph, and five benefit-led feature bullets. A fourth orchestrating agent fires them all in parallel and hands the combined result to a Streamlit UI.






What We Are Building


A four-agent system connected via the A2A (Agent-to-Agent) protocol, each running as its own FastAPI server. The workflow:


  1. Submit a product brief through a Streamlit form

  2. Route it to the copy director, which fires all three specialists in parallel

  3. Headline writer produces a main headline and three tagline variations

  4. Description writer produces an 80-120 word paragraph

  5. Features writer produces five benefit-led bullet points

  6. Combine all three results and render them as a finished product page section




Tech Stack


Component

Tool

Agent framework

Google ADK (open-source)

Model

OpenAI gpt-4o-mini via LiteLLM adapter

Agent communication

A2A protocol over HTTP (FastAPI + httpx)

UI

Streamlit

Session management

ADK InMemorySessionService




Pricing


Google ADK itself is completely free and open-source. The only cost comes from the OpenAI model: gpt-4o-mini is priced at $0.15 per million input tokens and $0.60 per million output tokens. One complete product brief (all three specialist calls) typically uses 500-800 tokens total and costs a fraction of a cent. Every call is logged to stats.json with prompt tokens, completion tokens, generation time, and computed cost.




Project Structure




adk_product_copy_agent/
├── agents/
│   ├── copy_director/       # orchestrator: fires all three specialists in parallel
│   │   ├── task_handler.py
│   │   ├── __main__.py
│   │   └── .well-known/agent.json
│   ├── headline_writer/     # specialist 1: headline and taglines
│   │   ├── agent.py
│   │   ├── task_handler.py
│   │   ├── __main__.py
│   │   └── .well-known/agent.json
│   ├── description_writer/  # specialist 2: 80-120 word paragraph
│   └── features_writer/     # specialist 3: 5 benefit-led bullets
├── common/
│   ├── agent_client.py      # async HTTP helper for director → specialist calls
│   └── agent_server.py      # FastAPI factory shared by every agent
├── shared/
│   ├── models.py            # ProductSpec Pydantic schema
│   └── stats_tracker.py     # per-call logging to stats.json
├── copy_ui.py               # Streamlit interface
├── start_agents.py          # single-command launcher for all four agents
├── requirements.txt
└── .env                     # API key, model name, per-token rates, port numbers




Setting Up


Create a file named requirements.txt in the project root:



google-adk
litellm
fastapi
uvicorn
httpx
pydantic
openai
streamlit
python-dotenv


Create a file named .env in the project root with all configuration in one place:



OPENAI_API_KEY=your_openai_api_key_here
OPENAI_MODEL=openai/gpt-4o-mini
GPT_4O_MINI_INPUT_COST=0.00000015
GPT_4O_MINI_OUTPUT_COST=0.00000060
PORT_COPY_DIRECTOR=9000
PORT_HEADLINE_WRITER=9001
PORT_DESCRIPTION_WRITER=9002
PORT_FEATURES_WRITER=9003


Every port number, model name, and cost rate lives in .env so nothing needs to change in code if you want to switch models or move ports.


Install dependencies and run:



python -m venv venv
venv\Scripts\activate
pip install -r requirements.txt


The Shared Schema


Before writing any agents, we define the single schema that every agent receives. This lives in shared/models.py and is imported by the Streamlit UI to validate form data before sending.


Create a file named shared/models.py:



from pydantic import BaseModel    # validated input schema shared by every agent in the pipeline


class ProductSpec(BaseModel):                               # one instance is built from the Streamlit form and sent to every agent
    product_name: str             # the name of the product, e.g. "Apple AirPods Pro"
    rough_description: str        # the user's own words describing what the product does and its key specs
    target_audience: str          # who the product is for, e.g. "iPhone users who want premium audio on the go"
    price_usd: float              # retail price in USD, used to calibrate the tone of value messaging in the copy
    brand_voice: str              # e.g. "premium and aspirational", "friendly and approachable", "bold and energetic"


ProductSpec is a Pydantic model, so any missing or wrongly typed field in the form will be rejected before it reaches any agent. All five fields are passed verbatim to each specialist’s system prompt.




The Shared Infrastructure


Two small files in common/ are shared by every agent. One handles the HTTP call from the director to a specialist; the other wraps a FastAPI app around any handler.


Create a file named common/agent_client.py:

import httpx                          # async HTTP client used by the director to call each specialist agent


async def dispatch_to_agent(url: str, payload: dict) -> dict:
    async with httpx.AsyncClient() as client:              # fresh client per call, avoids stale-connection issues
        response = await client.post(url, json=payload, timeout=60.0)  # 60s headroom for LLM inference latency
        response.raise_for_status()                         # raise immediately on any 4xx/5xx from the specialist
        return response.json()                              # the specialist's fully-written copy section as a dict

dispatch_to_agent is a plain async function. The director calls it three times concurrently via asyncio.gather, which is what makes the parallel execution possible.


Create a file named common/agent_server.py:



from fastapi import FastAPI           # each specialist and the director runs as its own FastAPI service


def create_agent_service(handler) -> FastAPI:
    app = FastAPI()                   # one independent FastAPI instance per agent process, no shared state

    @app.post("/run")                 # the A2A protocol's standard endpoint every agent in the network exposes
    async def run(payload: dict):     # receives the ProductSpec dict from the director or from the Streamlit UI
        return await handler.generate(payload)   # delegate to whichever agent-specific handler was injected

    return app                        # caller binds this app to a port via uvicorn.run(app, port=...)


Every agent exposes exactly one endpoint: POST /run. This is the A2A protocol’s convention: a fixed, predictable URL on each service that any other agent can call to get a result.




Stats Tracking


We log every model call to stats.json in the project root. The file accumulates across all requests and all agents, never overwriting what is already there.


Create a file named shared/stats_tracker.py:



import json                          # read and write the accumulated stats.json file
import os                            # read per-token cost rates from the environment
from datetime import datetime        # timestamp every call record written to stats.json
from pathlib import Path             # resolve the project root regardless of the working directory

from dotenv import load_dotenv       # load .env before reading cost rates from os.environ
import litellm                       # set global metadata that gets forwarded to every OpenAI API call

load_dotenv()                        # must run before any os.environ.get() call below

PROJECT_ROOT = Path(__file__).resolve().parent.parent   # two levels up from shared/ gives the project root
STATS_FILE = PROJECT_ROOT / "stats.json"                 # the single accumulating log file for all agents

# Per-token USD rates loaded from .env so pricing can be updated without changing code
_INPUT_COST = float(os.environ.get("GPT_4O_MINI_INPUT_COST", 0.00000015))   # $0.15 per million input tokens
_OUTPUT_COST = float(os.environ.get("GPT_4O_MINI_OUTPUT_COST", 0.00000060)) # $0.60 per million output tokens

COST_RATES = {
    "gpt-4o-mini": {"input": _INPUT_COST, "output": _OUTPUT_COST},           # plain model name key
    "openai/gpt-4o-mini": {"input": _INPUT_COST, "output": _OUTPUT_COST},    # provider-prefixed key from litellm
}

OPENAI_METADATA = {                  # forwarded to OpenAI's API, visible in the OpenAI usage dashboard
    "dev_name": "Ganesh",
    "project": "codex-test",
    "environment": "local",
    "purpose": "testing",
}


def configure_litellm() -> None:
    litellm.metadata = OPENAI_METADATA  # attaches to every LiteLLM call made by this process


def log_call(
    agent_name: str,                 # which specialist produced this record, e.g. "headline_writer"
    model: str,                      # the model id used, e.g. "openai/gpt-4o-mini"
    prompt: str,                     # the full prompt sent to the model for this call
    response: str,                   # the raw text the model returned
    generation_seconds: float,       # wall-clock time measured around runner.run_async()
    prompt_tokens: int = 0,          # input token count from ADK's event.usage_metadata, 0 if unavailable
    completion_tokens: int = 0,      # output token count from ADK's event.usage_metadata, 0 if unavailable
) -> None:
    model_key = model.replace("openai/", "")                                  # normalise to bare model name for lookup
    rates = COST_RATES.get(model_key, COST_RATES.get("gpt-4o-mini", {"input": 0, "output": 0}))  # fall back to gpt-4o-mini rates
    total_tokens = prompt_tokens + completion_tokens                           # sum of input and output tokens
    input_cost = round(prompt_tokens * rates["input"], 7)                     # USD cost of the prompt tokens
    output_cost = round(completion_tokens * rates["output"], 7)               # USD cost of the completion tokens

    record = {
        "timestamp": datetime.now().isoformat(),          # when this specific call was logged
        "agent": agent_name,                              # which agent produced this output
        "model": model,                                   # which model was used
        "generation_seconds": round(generation_seconds, 3),  # how long the API call took in seconds
        "prompt": prompt[:2000],                          # first 2000 chars to keep stats.json readable
        "response": response[:2000],                      # first 2000 chars of the generated copy
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "input_cost": input_cost,
        "output_cost": output_cost,
        "total_cost": round(input_cost + output_cost, 7),
    }

    try:
        existing = json.loads(STATS_FILE.read_text(encoding="utf-8")) if STATS_FILE.exists() else {"summary": {}, "calls": []}
    except (json.JSONDecodeError, OSError):
        existing = {"summary": {}, "calls": []}           # start fresh if the file is missing or corrupt

    existing["calls"].append(record)                      # accumulate: never overwrite, always append
    calls = existing["calls"]                             # reference to the full history for computing totals
    existing["summary"] = {
        "timestamp": datetime.now().isoformat(),          # when stats.json was last written
        "total_calls": len(calls),                        # total number of agent calls ever recorded
        "total_generation_seconds": round(sum(c.get("generation_seconds", 0) for c in calls), 3),  # cumulative API time
        "total_prompt_tokens": sum(c["prompt_tokens"] for c in calls),
        "total_completion_tokens": sum(c["completion_tokens"] for c in calls),
        "total_tokens": sum(c["total_tokens"] for c in calls),
        "total_input_cost": round(sum(c.get("input_cost", 0) for c in calls), 6),
        "total_output_cost": round(sum(c.get("output_cost", 0) for c in calls), 6),
        "total_cost": round(sum(c["total_cost"] for c in calls), 6),  # lifetime cost across all agents and runs
    }
    STATS_FILE.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8")   # atomic overwrite
    print(f"[stats] Logged {agent_name}: {total_tokens} tokens, ${record['total_cost']:.7f}, {round(generation_seconds,3)}s")


log_call is called directly inside each agent’s generate() function right after the ADK runner finishes, so timing and output are captured reliably without depending on any callback system. The summary block is recomputed from the full call history on every write, so it always reflects the true lifetime totals.




Building the Specialist Agents


All three specialists follow the same pattern: configure LiteLLM metadata, define the ADK Agent with a focused instruction, create a Runner, and expose a generate() function that creates a unique session per request, runs the model, logs the result, and returns structured JSON.



Headline Writer


This agent produces a main headline and three tagline variations. Create a file named agents/headline_writer/agent.py:



import os                            # read the model name and other config from the environment
import json                         # parse the JSON the model returns as its structured output
import time                         # measure wall-clock time around the ADK runner call
import uuid                         # generate a unique session id for every request
import sys
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent))  # make shared/ importable

from google.adk.agents import Agent              # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm  # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner            # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService  # holds per-request session state in memory
from google.genai import types                   # Content and Part types for constructing the user message
from dotenv import load_dotenv                   # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call  # metadata setup and per-call logging

load_dotenv()                                    # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm()                              # attaches OPENAI_METADATA to every LiteLLM call this process makes

MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini")  # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_headline"                    # fixed user id for this agent's in-memory session

headline_agent = Agent(
    name="headline_writer",                      # agent's internal identifier, must be unique within the app
    model=LiteLlm(MODEL),                        # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
    description="Writes a punchy product headline and tagline variations for a product page.",
    instruction=(
        "You are an expert product copywriter. Given a product name, description, target audience, "
        "price, and brand voice, write: one main headline (max 10 words, attention-grabbing, benefit-led), "
        "and three tagline variations (max 12 words each, different angles: aspirational, practical, witty). "
        "Respond ONLY with valid JSON with the key 'headlines' containing: "
        "'main_headline' (string) and 'taglines' (list of 3 strings)."
    ),
)

session_service = InMemorySessionService()       # holds session state in memory; no persistence needed here
runner = Runner(agent=headline_agent, app_name="headline_app", session_service=session_service)  # drives the conversation


async def generate(request: dict) -> dict:
    session_id = uuid.uuid4().hex                # unique per request so ADK never sees a duplicate session id
    await session_service.create_session(app_name="headline_app", user_id=WRITER_ID, session_id=session_id)  # register it
    prompt = (                                   # build the prompt from the ProductSpec fields sent by the director
        f"Product: {request['product_name']}. "
        f"What it does: {request['rough_description']}. "
        f"Target audience: {request['target_audience']}. "
        f"Price: ${request.get('price_usd', 0)}. "
        f"Brand voice: {request.get('brand_voice', 'professional')}. "
        "Write a headline and three tagline variations."
    )
    message = types.Content(role="user", parts=[types.Part(text=prompt)])  # ADK requires this typed wrapper
    start = time.monotonic()                     # start timing before the model call begins
    response_text = ""                           # will hold the model's raw JSON string once it arrives
    prompt_tokens = 0                            # default to 0; updated if ADK exposes usage_metadata
    completion_tokens = 0                        # same; updated if the event carries token count fields
    async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
        if event.is_final_response():            # skip intermediate streaming events, only process the last one
            response_text = event.content.parts[0].text  # extract the raw text from the ADK content object
            usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None)  # try both field names
            if usage:
                prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
                completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
    elapsed = time.monotonic() - start           # total wall-clock seconds the model took to respond
    log_call("headline_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens)  # write to stats.json
    try:
        parsed = json.loads(response_text)       # the model should have returned valid JSON
        return parsed.get("headlines", {"main_headline": response_text, "taglines": []})  # extract the nested dict
    except json.JSONDecodeError:
        return {"main_headline": response_text, "taglines": []}  # fallback: return raw text if JSON parsing fails


The instruction tells the model exactly what JSON shape to return. The uuid.uuid4().hex session ID ensures that submitting a second product never crashes with an AlreadyExistsError from the InMemorySessionService, which raises that error if you try to create a session that already exists.



Create a file named agents/headline_writer/task_handler.py:

from .agent import generate          # import the async generate function from this agent's agent.py


class HeadlineHandler:
    async def generate(self, payload: dict) -> dict:
        return await generate(payload)   # delegate to the module-level generate function; keeps the handler thin


Create a file named agents/headline_writer/__main__.py:



import os                                       # read the port number from the environment
from dotenv import load_dotenv                 # load .env so PORT_HEADLINE_WRITER is available
from common.agent_server import create_agent_service  # shared FastAPI factory used by every agent
from .task_handler import HeadlineHandler      # this agent's handler class

load_dotenv()                                  # must run before os.environ.get() reads the port
app = create_agent_service(HeadlineHandler())  # wire the handler into the /run endpoint

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=int(os.environ.get("PORT_HEADLINE_WRITER", 8001)))  # bind to the port from .env


Create a file named agents/headline_writer/.well-known/agent.json:



{"name": "headline_writer", "description": "Writes a product headline and three tagline variations."}



Description Writer and Features Writer


The description and features agents follow exactly the same structure as the headline writer. Only the agent name, instruction text, expected JSON output key, and port differ.


Create a file named agents/description_writer/agent.py:



import os                            # read the model name and other config from the environment
import json                         # parse the JSON the model returns as its structured output
import time                         # measure wall-clock time around the ADK runner call
import uuid                         # generate a unique session id for every request
import sys                          # modify the module search path so shared/ is importable
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent))  # add project root to sys.path

from google.adk.agents import Agent              # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm  # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner            # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService  # holds per-request session state in memory
from google.genai import types                   # Content and Part types for constructing the user message
from dotenv import load_dotenv                   # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call  # metadata setup and per-call logging

load_dotenv()                                    # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm()                              # attaches OPENAI_METADATA to every LiteLLM call this process makes

MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini")  # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_description"                 # fixed user id for this agent's in-memory session

description_agent = Agent(
    name="description_writer",                   # agent's internal identifier, must be unique within the app
    model=LiteLlm(MODEL),                        # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
    description="Writes a compelling product description paragraph for a product page.",
    instruction=(
        "You are an expert product copywriter. Given a product name, description, target audience, "
        "price, and brand voice, write a compelling product description paragraph of 80-120 words. "
        "Lead with the customer's problem or desire, introduce the product as the solution, "
        "highlight the core benefit, and close with a sense of confidence or assurance. "
        "Respond ONLY with valid JSON with the key 'description' containing a single string."
    ),
)

session_service = InMemorySessionService()       # holds session state in memory; no persistence needed here
runner = Runner(agent=description_agent, app_name="description_app", session_service=session_service)  # drives the conversation


async def generate(request: dict) -> dict:
    session_id = uuid.uuid4().hex                # fresh unique id per request, never reused across calls
    await session_service.create_session(app_name="description_app", user_id=WRITER_ID, session_id=session_id)  # register the session
    prompt = (                                   # build the prompt from the ProductSpec fields sent by the director
        f"Product: {request['product_name']}. "
        f"What it does: {request['rough_description']}. "
        f"Target audience: {request['target_audience']}. "
        f"Price: ${request.get('price_usd', 0)}. "
        f"Brand voice: {request.get('brand_voice', 'professional')}. "
        "Write an 80-120 word product description paragraph."
    )
    message = types.Content(role="user", parts=[types.Part(text=prompt)])  # ADK requires this typed wrapper
    start = time.monotonic()                     # start timing before the model call begins
    response_text = ""                           # will hold the model's raw JSON string once it arrives
    prompt_tokens = 0                            # default to 0; updated if ADK exposes usage_metadata
    completion_tokens = 0                        # same; updated if the event carries token count fields
    async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
        if event.is_final_response():            # ignore streaming intermediate events, only process the last one
            response_text = event.content.parts[0].text  # extract the raw text from the ADK content object
            usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None)  # try both field names
            if usage:
                prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
                completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
    elapsed = time.monotonic() - start           # total wall-clock seconds the model took to respond
    log_call("description_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens)  # write to stats.json
    try:
        parsed = json.loads(response_text)       # the model should have returned valid JSON
        return {"description": parsed.get("description", response_text)}  # extract the string inside the JSON key
    except json.JSONDecodeError:
        return {"description": response_text}    # fallback: return raw text if the model skipped JSON wrapping


Create a file named agents/features_writer/agent.py:



import os                            # read the model name and other config from the environment
import json                         # parse the JSON the model returns as its structured output
import time                         # measure wall-clock time around the ADK runner call
import uuid                         # generate a unique session id for every request
import sys                          # modify the module search path so shared/ is importable
sys.path.insert(0, str(__import__('pathlib').Path(__file__).resolve().parent.parent.parent))  # add project root to sys.path

from google.adk.agents import Agent              # ADK's base agent class, holds the model and instructions
from google.adk.models.lite_llm import LiteLlm  # adapts LiteLLM to ADK so non-Gemini models work
from google.adk.runners import Runner            # drives the conversation loop for one agent instance
from google.adk.sessions import InMemorySessionService  # holds per-request session state in memory
from google.genai import types                   # Content and Part types for constructing the user message
from dotenv import load_dotenv                   # load .env before reading OPENAI_MODEL from os.environ
from shared.stats_tracker import configure_litellm, log_call  # metadata setup and per-call logging

load_dotenv()                                    # must run before os.environ.get() reads OPENAI_MODEL
configure_litellm()                              # attaches OPENAI_METADATA to every LiteLLM call this process makes

MODEL = os.environ.get("OPENAI_MODEL", "openai/gpt-4o-mini")  # read from .env; falls back to gpt-4o-mini
WRITER_ID = "writer_features"                    # fixed user id for this agent's in-memory session

features_agent = Agent(
    name="features_writer",                      # agent's internal identifier, must be unique within the app
    model=LiteLlm(MODEL),                        # wrap the LiteLLM adapter so ADK can call any OpenAI-compatible model
    description="Writes a features and benefits bullet list for a product page.",
    instruction=(
        "You are an expert product copywriter. Given a product name, description, target audience, "
        "price, and brand voice, write 5 feature/benefit bullets. "
        "Each bullet should lead with the BENEFIT (what the customer gains) then briefly state the feature "
        "that delivers it. Format: 'Benefit — feature detail'. Keep each bullet under 15 words. "
        "Respond ONLY with valid JSON with the key 'features' containing a list of 5 strings."
    ),
)

session_service = InMemorySessionService()       # holds session state in memory; no persistence needed here
runner = Runner(agent=features_agent, app_name="features_app", session_service=session_service)  # drives the conversation


async def generate(request: dict) -> dict:
    session_id = uuid.uuid4().hex                # unique id prevents AlreadyExistsError on repeated requests
    await session_service.create_session(app_name="features_app", user_id=WRITER_ID, session_id=session_id)  # register the session
    prompt = (                                   # build the prompt from the ProductSpec fields sent by the director
        f"Product: {request['product_name']}. "
        f"What it does: {request['rough_description']}. "
        f"Target audience: {request['target_audience']}. "
        f"Price: ${request.get('price_usd', 0)}. "
        f"Brand voice: {request.get('brand_voice', 'professional')}. "
        "Write 5 benefit-led feature bullets."
    )
    message = types.Content(role="user", parts=[types.Part(text=prompt)])  # ADK requires this typed wrapper
    start = time.monotonic()                     # start timing before the model call begins
    response_text = ""                           # will hold the model's raw JSON string once it arrives
    prompt_tokens = 0                            # default to 0; updated if ADK exposes usage_metadata
    completion_tokens = 0                        # same; updated if the event carries token count fields
    async for event in runner.run_async(user_id=WRITER_ID, session_id=session_id, new_message=message):
        if event.is_final_response():            # only the final event carries the complete generated text
            response_text = event.content.parts[0].text  # extract the raw text from the ADK content object
            usage = getattr(event, "usage_metadata", None) or getattr(event, "usage", None)  # try both field names
            if usage:
                prompt_tokens = getattr(usage, "prompt_token_count", 0) or getattr(usage, "prompt_tokens", 0) or 0
                completion_tokens = getattr(usage, "candidates_token_count", 0) or getattr(usage, "completion_tokens", 0) or 0
    elapsed = time.monotonic() - start           # total wall-clock seconds the model took to respond
    log_call("features_writer", MODEL, prompt, response_text, elapsed, prompt_tokens, completion_tokens)  # write to stats.json
    try:
        parsed = json.loads(response_text)       # the model should have returned valid JSON
        return {"features": parsed.get("features", [])}  # the list of 5 benefit-led bullet strings
    except json.JSONDecodeError:
        return {"features": response_text}       # fallback if the model returned plain text instead of JSON


Each specialist also needs its own task_handler.py and main.py. These are structurally identical across all three agents, with only the handler class name, handler import, and port variable differing.


Create agents/description_writer/task_handler.py,

agents/description_writer/__main__.py, and the same two files for features_writer, following the exact same pattern as headline_writer above, substituting DescriptionHandler/FeaturesHandler and PORT_DESCRIPTION_WRITER/PORT_FEATURES_WRITER accordingly.




Building the Copy Director


The director is the only agent that does not call a language model directly. Its only job is to fire all three specialists in parallel and combine their results.


Create a file named agents/copy_director/task_handler.py:



import asyncio                                   # run the three specialist calls concurrently via asyncio.gather
import os                                        # read specialist port numbers from the environment
from dotenv import load_dotenv                  # load .env so the PORT_* variables are available
from common.agent_client import dispatch_to_agent  # shared async HTTP helper for calling each specialist

load_dotenv()                                    # must run before os.environ.get() reads the port variables

# Build each specialist's URL from .env so ports can be changed without touching code
HEADLINE_URL = f"http://127.0.0.1:{os.environ.get('PORT_HEADLINE_WRITER', 8001)}/run"
DESCRIPTION_URL = f"http://127.0.0.1:{os.environ.get('PORT_DESCRIPTION_WRITER', 8002)}/run"
FEATURES_URL = f"http://127.0.0.1:{os.environ.get('PORT_FEATURES_WRITER', 8003)}/run"


class DirectorHandler:
    async def generate(self, payload: dict) -> dict:
        # All three specialists are independent, run them in parallel rather than sequentially
        # so the total wait time equals the slowest single agent, not the sum of all three.
        headline_task = dispatch_to_agent(HEADLINE_URL, payload)      # fire headline_writer
        description_task = dispatch_to_agent(DESCRIPTION_URL, payload) # fire description_writer simultaneously
        features_task = dispatch_to_agent(FEATURES_URL, payload)      # fire features_writer simultaneously

        headlines, description, features = await asyncio.gather(       # wait for all three to finish
            headline_task, description_task, features_task
        )

        headlines = headlines if isinstance(headlines, dict) else {}    # guard against unexpected response shapes
        description = description if isinstance(description, dict) else {}
        features = features if isinstance(features, dict) else {}

        return {                                                        # combined result sent back to the Streamlit UI
            "headlines": headlines,
            "description": description.get("description", ""),         # unwrap the nested "description" string key
            "features": features.get("features", []),                   # unwrap the nested "features" list key
        }


asyncio.gather is the key line: it starts all three HTTP calls simultaneously and waits for all three to complete. Without it, the total time would be the sum of three separate model calls. With it, the total time is roughly equal to whichever single specialist takes longest.


Create a file named agents/copy_director/__main__.py:



import os                                       # read PORT_COPY_DIRECTOR from the environment
from dotenv import load_dotenv                 # load .env before reading the port
from common.agent_server import create_agent_service  # shared FastAPI factory
from .task_handler import DirectorHandler      # the director's handler that orchestrates all three specialists

load_dotenv()                                  # must run before os.environ.get() below
app = create_agent_service(DirectorHandler())  # wire the director handler into the /run endpoint

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=int(os.environ.get("PORT_COPY_DIRECTOR", 9000)))  # port 9000 by default




The Streamlit UI


The UI form collects the five fields that match ProductSpec, calls the director’s /run endpoint, and renders the combined result as a formatted product page section.


Create a file named copy_ui.py in the project root:



import os                            # read PORT_COPY_DIRECTOR from the environment
import requests                     # synchronous HTTP to call the director's /run endpoint
import streamlit as st              # the entire visual layer: form, layout, and result rendering
from dotenv import load_dotenv      # load .env so PORT_COPY_DIRECTOR is available

load_dotenv()                       # must run before os.environ.get() reads the port below

st.set_page_config(page_title="Product Copy Generator", page_icon="🛒", layout="wide")
st.title("🛒 Product Page Copy Generator")   # page title shown at the top of the browser tab and the app

with st.form("product_form"):                # group all inputs into one form so submit fires a single event
    product_name = st.text_input("Product name", placeholder="e.g. Apple AirPods Pro")
    rough_description = st.text_area(
        "Describe the product in your own words",
        placeholder="e.g. An electric height-adjustable standing desk with memory presets and a quiet motor.",
        height=100,
    )
    col1, col2 = st.columns(2)               # split the remaining inputs into two equal columns
    with col1:
        target_audience = st.text_input("Target audience", placeholder="e.g. remote workers aged 25-45")
        brand_voice = st.selectbox(          # constrained dropdown prevents free-text typos in the tone field
            "Brand voice",
            ["premium and aspirational", "friendly and approachable", "technical and precise", "bold and energetic"],
        )
    with col2:
        price_usd = st.number_input("Price (USD)", min_value=1.0, value=299.0, step=10.0)

    submitted = st.form_submit_button("Generate Copy", type="primary", use_container_width=True)  # full-width primary button

if submitted:                               # everything below only runs when the user clicks the button
    if not product_name or not rough_description or not target_audience:
        st.info("Please fill in product name, description, and target audience.")
        st.stop()                           # halt rendering; don't make the API call with missing fields

    payload = {                             # build the dict that matches the ProductSpec Pydantic schema
        "product_name": product_name,
        "rough_description": rough_description,
        "target_audience": target_audience,
        "price_usd": price_usd,
        "brand_voice": brand_voice,
    }

    with st.spinner("Writing headlines, description, and feature bullets in parallel..."):
        try:
            director_port = int(os.environ.get("PORT_COPY_DIRECTOR", 9000))  # read from .env, fall back to 9000
            response = requests.post(f"http://127.0.0.1:{director_port}/run", json=payload, timeout=90)  # 90s for 3 parallel LLM calls
            response.raise_for_status()     # raise on 4xx/5xx from the director
            data = response.json()          # the combined result dict: headlines, description, features
        except requests.exceptions.RequestException as e:
            st.warning(f"Could not reach the copy director. Make sure all agents are running. ({e})")
            st.stop()

    headlines = data.get("headlines", {})   # dict with "main_headline" and "taglines"
    description = data.get("description", "")  # the 80-120 word paragraph string
    features = data.get("features", [])     # list of 5 benefit-led bullet strings

    st.success("Copy ready!")               # green banner confirms the run completed
    st.divider()                            # visual separator between the form and the copy output

    st.subheader(headlines.get("main_headline", ""))   # render the main headline as a subheading

    if headlines.get("taglines"):
        cols = st.columns(len(headlines["taglines"]))   # one column per tagline for a clean side-by-side layout
        for col, tagline in zip(cols, headlines["taglines"]):
            col.caption(f"*{tagline}*")     # italicised caption for each tagline variation

    st.divider()
    st.markdown(description)               # render the description paragraph as markdown
    st.divider()

    st.subheader("Features & Benefits")
    if isinstance(features, list):
        for bullet in features:
            st.markdown(f"✓ {bullet}")     # checkmark prefix makes each bullet visually scannable
    else:
        st.markdown(str(features))         # fallback if the model returned a raw string instead of a list


The form is wrapped in st.form() so that filling in multiple fields and then clicking “Generate Copy” fires exactly one request, not one per field change. The timeout=90 on the HTTP call is intentionally generous: the director waits for all three parallel model calls to finish before responding, and the slowest one can take 20-30 seconds.




The Launcher


Rather than opening four separate terminals, a single script starts all four agents and watches for unexpected exits.


Create a file named start_agents.py in the project root:



import os                            # read agent ports from the environment
import subprocess                   # launch each agent as a child process
import sys                          # access the current Python interpreter path
import time                         # sleep between process health checks
import signal                       # catch Ctrl+C and SIGTERM for clean shutdown
from dotenv import load_dotenv      # load .env so PORT_* variables are available

load_dotenv()                       # must run before os.environ.get() reads the port values

# One entry per agent: (module name under agents/, port number from .env)
AGENTS = [
    ("copy_director",      int(os.environ.get("PORT_COPY_DIRECTOR",     9000))),  # orchestrator
    ("headline_writer",    int(os.environ.get("PORT_HEADLINE_WRITER",   9001))),  # specialist 1
    ("description_writer", int(os.environ.get("PORT_DESCRIPTION_WRITER",9002))),  # specialist 2
    ("features_writer",    int(os.environ.get("PORT_FEATURES_WRITER",   9003))),  # specialist 3
]

processes = []                      # keep references so shutdown() can terminate every one


def launch():
    print("Starting all agents...\n")
    for module, port in AGENTS:
        proc = subprocess.Popen(
            [sys.executable, "-m", f"agents.{module}"],          # same Python that runs this script
            cwd=os.path.dirname(os.path.abspath(__file__)),       # run from the project root so imports resolve
        )
        processes.append(proc)                                    # track for later shutdown
        print(f"  Started agents.{module} on port {port} (pid {proc.pid})")
    print(f"\nAll {len(AGENTS)} agents running.")
    print("Press Ctrl+C here to stop all agents.\n")


def shutdown(sig=None, frame=None):
    print("\nShutting down all agents...")
    for proc in processes:
        proc.terminate()            # send SIGTERM to each child process
    for proc in processes:
        proc.wait()                 # block until every child has fully exited
    print("All agents stopped.")
    sys.exit(0)


signal.signal(signal.SIGINT, shutdown)   # Ctrl+C triggers clean shutdown
signal.signal(signal.SIGTERM, shutdown)  # SIGTERM is the "please stop cleanly" signal sent by task managers and cloud platforms

launch()                            # start all four processes

while True:                         # keep the parent alive and watch for unexpected child exits
    time.sleep(1)
    for proc in processes:
        if proc.poll() is not None:                              # poll() returns None while the process is running
            print(f"Agent (pid {proc.pid}) exited unexpectedly. Shutting down all.")
            shutdown()


subprocess.Popen with -m agents.module runs each agent the same way Python’s module runner does, which means relative imports inside each main.py resolve correctly. The while True loop at the bottom keeps the launcher alive and restarts the whole group if any one process exits unexpectedly.




Running the Application


Start all four agents in one terminal:



python start_agents.py


Then in a second terminal launch the Streamlit UI:



streamlit run copy_ui.py


Open http://localhost:8501 and fill in the product details. Results appear once all three specialists have finished. Whichever specialist takes longest determines the total wait, typically 10-25 seconds per product depending on network and model load.

stats.json in the project root accumulates a record for every model call, with prompt text, response text, token counts, cost, and timing per agent. The summary block at the top of the file reflects lifetime totals across every product and every agent since the file was created.




Who Can Benefit


  • Developers — Get a working example of a real multi-agent ADK system using the A2A protocol, parallel execution, and LiteLLM model routing.

  • Product teams — Use this as a starting point for any copywriting or content generation workflow that benefits from specialised agents working in parallel.

  • Students learning AI agent architecture — See how asyncio.gather, FastAPI services, and Google ADK combine into a production-style multi-agent pipeline.

  • E-commerce and SaaS teams — Adapt the specialist agents to produce SEO meta descriptions, email subject lines, or ad copy without changing the underlying architecture.

  • Engineers evaluating ADK — Get real cost, timing, and output data from a working deployment before committing the framework to a larger project.




How Codersarts Can Help


If you want to take this further, Codersarts offers hands-on support at every stage.


  • For learners: Live 1-to-1 sessions with an AI engineer who can walk through Google ADK’s agent architecture, the A2A communication protocol, and multi-agent parallel execution in detail.

  • For teams: End-to-end development of multi-agent content generation pipelines, including model selection, prompt engineering, cost optimisation, and reliability testing.

  • For enterprises: Architecture consulting for production agent deployments, including scaling individual agents independently, adding authentication, and integrating with existing content management systems.


Reach out at contact@codersarts.com or visit www.codersarts.com to get started.




Continue Your AI Learning Journey with Codersarts


If you enjoyed this article and would like to discover more about modern AI applications, production-ready LLM systems, and real-world RAG and MCP implementations, be sure to explore these other blogs from Codersarts:











Comments


bottom of page