A Complete Guide to Creating a Multi-Agent Book Writing System - Part 4
- ganesh90
- Jun 9
- 19 min read
Updated: Jun 13
Prerequisite: This is a continuation of the blog Part 3: A Complete Guide to Creating a Multi-Agent Book Writing System

🚀 Chapter Writing Assistant in a Parallel Universe
Imagine you're the manager of a publishing house, and you want to get an entire book written fast. Instead of one writer doing all the chapters back-to-back, you assign each chapter to a different writer working in parallel. That’s exactly what this function helps you do!
The Parallel Writer Function
def write_chapter_parallel(chapter_data, model_name, device_id=None):
"""Standalone function for parallel chapter writing."""
# Import inside function for process isolation
import torch
import logging
from transformers import AutoTokenizer, AutoModelForCausalLM
# Each worker gets their own agent instance
writer = ParallelWriterAgent(model_name=model_name, device_id=device_id)
return writer.process_chapter(chapter_data)
Why Imports Are Inside: 🔄 Each spawned process needs its own imports. It's like making sure each construction worker has their own copy of the blueprint!
Let’s break it down together:
def write_chapter_parallel(chapter_data, model_name, device_id=None):
"""Standalone function for parallel chapter writing."""
This is a standalone function, which means it’s meant to run independently in a separate process. That’s important because when we’re using Python’s multiprocessing, we need everything to be clean, self-contained, and pickle-friendly.
🛠️ Creating a Personal Writing Agent for Each Chapter
writer = ParallelWriterAgent(model_name=model_name, device_id=device_id)
Here, we spin up a brand-new writer agent for this specific chapter. This writer:
Knows which model to use (model_name)
Gets assigned a device (like a GPU), if one is available
Each worker (or process) has its own writer — no fighting over resources!
✍️ Let the Writing Begin
return writer.process_chapter(chapter_data)
We feed the writer the chapter’s info and supporting research. The writer does its thing (writes the content, adds references), and hands it back to us.
✅ Pro Tip: This function is a perfect drop-in for multiprocessing.Pool.map() or concurrent.futures.ProcessPoolExecutor.map() — so you can write 5, 10, or even 50 chapters at once!
write_chapter_parallel is a smart helper that lets you write chapters in parallel.
It keeps each worker self-contained, import-safe, and device-aware.
Perfect for speeding up long-form writing tasks with large models.
🧠 Meet Your Multitasking Writing Assistant: ParallelWriterAgent
class ParallelWriterAgent(WriterAgent):
"""A parallel version of the WriterAgent for multiprocessing."""
def init(self, model_name=LLM_MODEL, device_id=None):
"""Initialize with specific device targeting."""
super().__init__(model_name)
# Target specific GPU if provided
if device_id is not None:
self.device = f"cuda:{device_id}" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
logging.info(f"Writer agent initialized on device {self.device}")
def process_chapter(self, chapter_data):
"""Process a single chapter for parallel execution."""
chapter_info, research_docs = chapter_data
title = chapter_info["title"]
logging.info(f"Writing chapter in parallel: {title}")
# Do the actual writing
chapter_content, references = self.write_chapter(chapter_info, research_docs)
logging.info(f"Completed writing chapter: {title}")
return (title, chapter_content, references)
Ever wish you could clone yourself to finish multiple tasks at once? Well, that’s kind of what ParallelWriterAgent does — it's your regular WriterAgent, but supercharged to work in parallel using Python’s multiprocessing magic.
Let’s break it down together 👇
class ParallelWriterAgent(WriterAgent):
"""A parallel version of the WriterAgent for multiprocessing."""
Think of WriterAgent as your original, solo writer. Now, imagine you're starting a writer's team — each member gets their own job and can work independently. That’s what ParallelWriterAgent sets up!
🛠️ Custom Setup with GPU
def init(self, model_name=LLM_MODEL, device_id=None):
"""Initialize with specific device targeting."""
super().__init__(model_name)
This constructor kicks things off by calling the original writer's setup (super().__init__). But here's where things get cool:
if device_id is not None:
self.device = f"cuda:{device_id}" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
logging.info(f"Writer agent initialized on device {self.device}")
📌 What’s Happening Here?
If you give it a device_id, it tries to assign a GPU like a personal workstation for that agent.
No GPU? No problem. It defaults to the CPU, just like using pen and paper instead of a laptop.
It logs which device it’s using — great for debugging or just geeking out over your setup!
✍️ Writing a Chapter — The Parallel Way
def process_chapter(self, chapter_data):
"""Process a single chapter for parallel execution."""
chapter_info, research_docs = chapter_data
Imagine someone hands this agent a folder labeled “Chapter 3: AI in Education” along with a stack of research papers. Here’s how the agent handles it:
title = chapter_info["title"]
logging.info(f"Writing chapter in parallel: {title}")
✅ It notes the chapter title and logs that it's getting started.
chapter_content, references = self.write_chapter(chapter_info, research_docs)
✍️ Then it writes the chapter (using the magical write_chapter method, inherited from the base class) — drawing from the research docs like a diligent essayist.
logging.info(f"Completed writing chapter: {title}")
return (title, chapter_content, references)
🎉 It logs completion and returns the goods — title, written content, and references.
Multi-GPU Brilliance: 🎯
if device_id is not None:
self.device = f"cuda:{device_id}" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
This allows targeting specific GPUs. If you have multiple GPUs, different workers can use different ones simultaneously! It's like having multiple kitchens so several chefs can cook different dishes at the same time.
The Main BookProject Class - The Master Conductor
This BookProject class is like having a symphony conductor who can coordinate an entire orchestra of AI specialists to create beautiful music together. Except instead of violins and cellos, we're coordinating researchers, writers, and editors to create an amazing book!
Picture this: You walk into the most efficient publishing house ever, and there's this one person who knows EXACTLY what everyone should be doing, when they should do it, and how to make sure everything comes together perfectly. That's our BookProject class! 🎭
import multiprocessing as mp
from functools import partial
import time
class BookProject:
"""Main project class that coordinates all agents."""
def init(self):
"""Initialize the book project with all agents."""
logging.info("Initializing book project")
# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Initialize the RAG system
self.rag = RAGSystem()
# Load documents and build vector store
self.rag.load_documents()
self.rag.build_vector_store()
# Initialize all our specialized agents
self.outline_agent = OutlineAgent()
self.researcher_agent = ResearcherAgent(self.rag)
self.writer_agent = WriterAgent()
self.editor_agent = EditorAgent()
def save_research_results_to_csv(self, research_results, chapter_title, output_path):
"""Save research results to CSV for audit trail."""
with open(output_path, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# Write header if file is new
if csvfile.tell() == 0:
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
# Write research results
for i, doc in enumerate(research_results, 1):
writer.writerow([
chapter_title,
i,
doc.metadata.get('source', 'Unknown'),
doc.page_content
])
logging.info(f"Research results for '{chapter_title}' saved to {output_path}")
def postprocess_headings(self, content: str) -> str:
"""Ensure consistent Markdown heading format."""
logging.info("Post-processing for heading consistency")
lines = content.split('\n')
processed_lines = []
for line in lines:
# Convert bold headings to proper Markdown
if line.strip().startswith('**') and line.strip().endswith('**') and len(line.strip()) < 100:
heading_text = line.strip()[2:-2] # Remove ** from both ends
processed_lines.append(f"## {heading_text}")
# Handle section headings with colons
elif '**' in line and ':' in line and line.strip().startswith('**'):
parts = line.split(':', 1)
heading_part = parts[0].strip()
if heading_part.startswith('**') and heading_part.endswith('**'):
heading_text = heading_part[2:-2]
processed_lines.append(f"### {heading_text}")
# Add content after colon if exists
if len(parts) > 1 and parts[1].strip():
processed_lines.append(parts[1].strip())
else:
processed_lines.append(line)
# Handle ALL CAPS headings
elif (not line.strip().startswith('#') and
not line.strip().startswith('**') and
line.strip().isupper() and
3 < len(line.strip()) < 50):
processed_lines.append(f"### {line.strip().title()}")
# Leave properly formatted content unchanged
else:
processed_lines.append(line)
return '\n'.join(processed_lines)
def run(self):
"""Execute the complete book writing workflow."""
logging.info("Starting the book writing process")
# Step 1: Create the outline
outline = self.outline_agent.create_outline()
logging.info(f"Book outline created with {len(outline)} chapters:")
for chapter in outline:
logging.info(f"- {chapter['title']}: {chapter['description'][:50]}...")
# Step 2: Research all chapters and save results
all_research = {}
research_csv_path = os.path.join(OUTPUT_DIR, "research_results.csv")
# Initialize CSV with headers
with open(research_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
# Research each chapter
for chapter_info in outline:
logging.info(f"Researching for chapter: {chapter_info['title']}")
research_docs = self.researcher_agent.research(chapter_info)
all_research[chapter_info['title']] = research_docs
self.save_research_results_to_csv(research_docs, chapter_info['title'], research_csv_path)
# Step 3: Write each chapter
chapters = []
for i, chapter_info in enumerate(outline):
chapter_num = i + 1
logging.info(f"Writing Chapter {chapter_num}: {chapter_info['title']}")
# Use stored research results
research_docs = all_research[chapter_info['title']]
# Write the chapter
chapter_content, references = self.writer_agent.write_chapter(chapter_info, research_docs)
# Post-process for consistency
chapter_content = self._post_process_headings(chapter_content)
chapters.append((chapter_info["title"], chapter_content, references))
# Step 4: Compile the final book
logging.info("Compiling all chapters into final book")
book_content = self.editor_agent.compile_book(chapters, outline)
# Final post-processing
book_content = self._post_process_headings(book_content)
# Save the masterpiece
output_path = os.path.join(OUTPUT_DIR, "machine_learning_book.md")
with open(output_path, "w", encoding="utf-8") as f:
f.write(book_content)
logging.info(f"Complete book saved to {output_path}")
return output_path, research_csv_path
The Complete Workflow Explained: 🎭
🏗️ The Foundation: Assembling Your Dream Team
import multiprocessing as mp
from functools import partial
import time
class BookProject:
"""Main project class that coordinates all agents."""
Think of this class as the executive producer of a blockbuster movie. They don't act, direct, or operate cameras, but they make sure all the talented specialists work together to create something amazing!
Why this design rocks:
Single point of coordination (no chaos, no confusion)
Clear separation of responsibilities (everyone knows their job)
Easy to understand and debug (follow the workflow step by step)
🚀 The Initialization: Setting Up the Publishing Empire
def init(self):
"""Initialize the book project with all agents."""
logging.info("Initializing book project")
# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Initialize the RAG system
self.rag = RAGSystem()
# Load documents and build vector store
self.rag.load_documents()
self.rag.build_vector_store()
# Initialize all our specialized agents
self.outline_agent = OutlineAgent()
self.researcher_agent = ResearcherAgent(self.rag)
self.writer_agent = WriterAgent()
self.editor_agent = EditorAgent()
This initialization is like opening day at your new publishing company! Let's watch the magic unfold step by step...
Step 1: Setting Up the Office Space 🏢
os.makedirs(OUTPUT_DIR, exist_ok=True)
Like the first thing any good business owner does - make sure you have a place to put your finished products! The exist_ok=True is like saying "If the office already exists, that's totally fine - we'll just use it!"
Gotcha Alert: 🚨 Without exist_ok=True, running this twice would crash with "Directory already exists!" Nobody wants that kind of drama on day one!
Step 2: Hiring Your Super-Smart Research Assistant 🧠
self.rag = RAGSystem()
self.rag.load_documents()
self.rag.build_vector_store()
What's happening here is MIND-BLOWING: 🤯
Like hiring an assistant who:
Reads every document in your entire company library
Memorizes everything with perfect recall
Creates a lightning-fast index so they can find ANY information in milliseconds
Never forgets anything and never gets tired!
The Three-Step Process:
Create the system (RAGSystem()) - hire the assistant
Load all documents (.load_documents()) - give them the library to read
Build the search index (.build_vector_store()) - let them organize everything for instant access
Pro Tip: 💡 This is where the magic happens! Your RAG system is literally reading every PDF, every text file, understanding the meaning, and creating a searchable knowledge base. It's like having Google, but for YOUR specific documents!
Step 3: Assembling Your Specialist Dream Team 👥
self.outline_agent = OutlineAgent()
self.researcher_agent = ResearcherAgent(self.rag)
self.writer_agent = WriterAgent()
self.editor_agent = EditorAgent()
Like assembling the perfect magazine team:
📋 Outline Agent: The editorial planner who designs the magazine structure
🕵️ Researcher Agent: The investigative journalist who finds all the facts (and gets access to that amazing RAG assistant!)
✍️ Writer Agent: The feature writer who crafts compelling articles
✨ Editor Agent: The copy editor who makes everything shine
Notice something cool? Only the Researcher Agent gets access to the RAG system! That's because research is their specialty - they need the super-powered search capabilities, while the others focus on their own strengths.
Like a perfectly organized kitchen where:
The sous chef plans the menu (Outline Agent)
The prep cook gathers all ingredients (Researcher Agent with special access to the pantry)
The line cook prepares the dishes (Writer Agent)
The head chef does final plating (Editor Agent)
📊 The Audit Trail Creator: Your Academic Integrity Guardian
def save_research_results_to_csv(self, research_results, chapter_title, output_path):
"""Save research results to CSV for audit trail."""
with open(output_path, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# Write header if file is new
if csvfile.tell() == 0:
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
# Write research results
for i, doc in enumerate(research_results, 1):
writer.writerow([
chapter_title,
i,
doc.metadata.get('source', 'Unknown'),
doc.page_content
])
logging.info(f"Research results for '{chapter_title}' saved to {output_path}")
This function is like having the world's most meticulous research librarian who creates a perfect paper trail! 📚
The File Opening Magic: Smart and Safe 🔒
with open(output_path, 'a', newline='', encoding='utf-8') as csvfile:
What each parameter does:
'a' mode: Append mode - adds new info without erasing old stuff (like adding pages to a notebook)
newline='': Prevents weird empty lines in CSV files (technical but important!)
encoding='utf-8': Handles international characters properly (so names like José or 北京 work perfectly)
with statement: Automatically closes the file when done (like having a responsible assistant who always locks up!)
Like a bank vault that automatically locks when you're done, never loses your deposits, and can handle documents in any language!
The Smart Header Detection: Pure Genius 🧠
if csvfile.tell() == 0:
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
This line is BRILLIANT! ✨ csvfile.tell() returns the current position in the file. If it's 0, the file is empty and needs headers!
Like checking if a notebook is blank before deciding whether to write column headers on the first page. Simple logic, elegant solution!
Pro Tip: 💡 This prevents the classic "headers on every append" problem that would make your CSV look like:
Chapter,Source,Content
Chapter 1,doc1,content1
Chapter,Source,Content ← Annoying duplicate headers!
Chapter 2,doc2,content2
The Documentation Loop: Transparency at Its Finest 📋
for i, doc in enumerate(research_results, 1):
writer.writerow([
chapter_title,
i,
doc.metadata.get('source', 'Unknown'),
doc.page_content
])
Like a PhD student who meticulously documents EVERY source they use, with:
Which chapter it's for
Document number (1, 2, 3...)
Original source (or 'Unknown' if metadata is missing)
Actual content that was used
Why this is amazing for trust:
Complete transparency - see exactly what sources were used
Academic integrity - perfect for scholarly work
Debugging gold - if something seems off, trace it back to the source
Legal compliance - some industries require this level of documentation
Fun Fact: 🤓 This approach would make any university professor weep tears of joy - it's like automatic bibliography generation!
🎨 The Formatting Wizard: Making Everything Beautiful
def postprocess_headings(self, content: str) -> str:
"""Ensure consistent Markdown heading format."""
logging.info("Post-processing for heading consistency")
lines = content.split('\n')
processed_lines = []
for line in lines:
# Convert bold headings to proper Markdown
if line.strip().startswith('**') and line.strip().endswith('**') and len(line.strip()) < 100:
heading_text = line.strip()[2:-2] # Remove ** from both ends
processed_lines.append(f"## {heading_text}")
# Handle section headings with colons
elif '**' in line and ':' in line and line.strip().startswith('**'):
parts = line.split(':', 1)
heading_part = parts[0].strip()
if heading_part.startswith('**') and heading_part.endswith('**'):
heading_text = heading_part[2:-2]
processed_lines.append(f"### {heading_text}")
# Add content after colon if exists
if len(parts) > 1 and parts[1].strip():
processed_lines.append(parts[1].strip())
else:
processed_lines.append(line)
# Handle ALL CAPS headings
elif (not line.strip().startswith('#') and
not line.strip().startswith('**') and
line.strip().isupper() and
3 < len(line.strip()) < 50):
processed_lines.append(f"### {line.strip().title()}")
# Leave properly formatted content unchanged
else:
processed_lines.append(line)
return '\n'.join(processed_lines)
This function is like having a professional copy editor who specializes in making documents look absolutely perfect! ✨
Pattern 1: The Bold-to-Heading Transformer 💫
if line.strip().startswith('**') and line.strip().endswith('**') and len(line.strip()) < 100:
heading_text = line.strip()[2:-2] # Remove ** from both ends
processed_lines.append(f"## {heading_text}")
What it catches: Lines like Introduction to Machine Learning What it becomes: ## Introduction to Machine Learning
Like a stylist who says "That bold text WANTS to be a heading! Let me fix that for you!" The length check (< 100) prevents converting long bold sentences that aren't actually headings.
Gotcha Prevention: 🚨 Without the length check, you might accidentally convert things like This is a really long sentence that happens to be bold but definitely isn't a heading and would look ridiculous as one into a heading!
Pattern 2: The Colon-Splitting Ninja 🥷
elif '**' in line and ':' in line and line.strip().startswith('**'):
parts = line.split(':', 1)
# ...
What it catches: Lines like Key Concepts: These are the important ideas we'll cover What it becomes:
### Key Concepts
These are the important ideas we'll cover
The split(':', 1) Magic: Only splits on the FIRST colon! So if your content has more colons (like time stamps), they're preserved.
Like smartly separating "Re: Meeting Tomorrow" into subject and body, but keeping any colons that appear in the body text intact!
Pattern 3: The ALL CAPS Whisperer 📢
elif (not line.strip().startswith('#') and
not line.strip().startswith('**') and
line.strip().isupper() and
3 < len(line.strip()) < 50):
processed_lines.append(f"### {line.strip().title()}")
What it catches: Lines like MACHINE LEARNING FUNDAMENTALS
What it becomes: ### Machine Learning Fundamentals
Like taking someone who's SHOUTING IN ALL CAPS and gently teaching them to speak normally while preserving their enthusiasm!
Smart Filtering Logic:
Not already a heading (not line.strip().startswith('#'))
Not already bold (not line.strip().startswith('**'))
Actually in all caps (line.strip().isupper())
Reasonable length (3 < len(line.strip()) < 50)
Pro Tip: 💡 The length limits prevent converting things like single letters ("A") or entire paragraphs that happen to be in caps!
🎭 The Master Workflow: The Four-Act Symphony
def run(self):
"""Execute the complete book writing workflow."""
logging.info("Starting the book writing process")
# Step 1: Create the outline
outline = self.outline_agent.create_outline()
logging.info(f"Book outline created with {len(outline)} chapters:")
for chapter in outline:
logging.info(f"- {chapter['title']}: {chapter['description'][:50]}...")
This run() method is like watching a perfectly choreographed Broadway show where every actor knows their cue and every scene flows seamlessly into the next! 🎪
Act 1: The Planning Phase 📋
What's happening: The Outline Agent creates the book structure
Like the first act where the story setup happens - you need to know the plot before the action can begin!
The Logging Magic:
for chapter in outline:
logging.info(f"- {chapter['title']}: {chapter['description'][:50]}...")
Why [:50]...? Truncates long descriptions to keep logs readable! It's like showing movie trailers instead of full movies in the credits.
Act 2: The Research Marathon 🔍
# Step 2: Research all chapters and save results
all_research = {}
research_csv_path = os.path.join(OUTPUT_DIR, "research_results.csv")
# Initialize CSV with headers
with open(research_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
# Research each chapter
for chapter_info in outline:
logging.info(f"Researching for chapter: {chapter_info['title']}")
research_docs = self.researcher_agent.research(chapter_info)
all_research[chapter_info['title']] = research_docs
self.save_research_results_to_csv(research_docs, chapter_info['title'], research_csv_path)
Like a documentary team that researches every topic BEFORE filming begins, creates detailed notes, and organizes everything perfectly for the production team!
The Brilliant Storage Strategy:
all_research = {}
all_research[chapter_info['title']] = research_docs
Why this is genius: Instead of researching multiple times, we do it ONCE and store everything in a dictionary. It's like doing all your grocery shopping at once instead of making separate trips for each meal!
Dictionary Structure:
all_research = {
"What is Machine Learning": [doc1, doc2, doc3],
"Supervised Learning": [doc4, doc5, doc6],
"Unsupervised Learning": [doc7, doc8, doc9]
}
Pro Tip: 💡 This approach prevents duplicate work AND ensures consistency - the same research gets used for writing, so there are no surprises!
Act 3: The Writing Intensive ✍️
# Step 3: Write each chapter
chapters = []
for i, chapter_info in enumerate(outline):
chapter_num = i + 1
logging.info(f"Writing Chapter {chapter_num}: {chapter_info['title']}")
# Use stored research results
research_docs = all_research[chapter_info['title']]
# Write the chapter
chapter_content, references = self.writer_agent.write_chapter(chapter_info, research_docs)
# Post-process for consistency
chapter_content = self._post_process_headings(chapter_content)
chapters.append((chapter_info["title"], chapter_content, references))
Like a structured writing retreat where each author works on their assigned chapter, using carefully prepared research materials, with immediate quality control!
The Sequential Beauty:
Chapter numbering with enumerate(outline) for friendly logging
Smart data retrieval from our research dictionary
Immediate quality control with post-processing
Organized storage as tuples: (title, content, references)
Why enumerate() is elegant:
for i, chapter_info in enumerate(outline):
chapter_num = i + 1 # Human-friendly numbering (1, 2, 3...)
Like a restaurant that prepares each course in order, using ingredients prepped earlier, with immediate quality checks before serving!
Act 4: The Grand Finale 🎬
# Step 4: Compile the final book
logging.info("Compiling all chapters into final book")
book_content = self.editor_agent.compile_book(chapters, outline)
# Final post-processing
book_content = self._post_process_headings(book_content)
# Save the masterpiece
output_path = os.path.join(OUTPUT_DIR, "machine_learning_book.md")
with open(output_path, "w", encoding="utf-8") as f:
f.write(book_content)
logging.info(f"Complete book saved to {output_path}")
return output_path, research_csv_path
🎬 Like the final editing phase where all the individually perfect scenes get assembled into a cohesive movie, with final color correction and sound mixing!
The Double Quality Check:
Individual chapter post-processing (during writing)
Final book post-processing (after compilation)
Why double-check? The Editor Agent might introduce new formatting quirks during compilation, so we clean up one more time!
The Victory Moment:
return output_path, research_csv_path
The Parallel Processing Powerhouse
class ParallelBookProject(BookProject):
"""Enhanced BookProject with parallel processing capabilities."""
def init(self, num_gpus=1):
"""Initialize with GPU count specification."""
super().__init__()
self.num_gpus = min(num_gpus, torch.cuda.device_count() if torch.cuda.is_available() else 1)
logging.info(f"Initialized ParallelBookProject with {self.num_gpus} GPU(s)")
def run(self):
"""Execute with parallel chapter writing."""
logging.info("Starting parallel book writing process")
# Steps 1-2: Same as sequential (outline and research)
outline = self.outline_agent.create_outline()
# Research phase (same as sequential)
all_research = {}
research_csv_path = os.path.join(OUTPUT_DIR, "research_results.csv")
with open(research_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
for chapter_info in outline:
logging.info(f"Researching for chapter: {chapter_info['title']}")
research_docs = self.researcher_agent.research(chapter_info)
all_research[chapter_info['title']] = research_docs
self.save_research_results_to_csv(research_docs, chapter_info['title'], research_csv_path)
# Step 3: Parallel chapter writing magic!
chapter_data = []
for chapter_info in outline:
research_docs = all_research[chapter_info['title']]
chapter_data.append((chapter_info, research_docs))
logging.info("Starting parallel chapter writing")
# Configure multiprocessing for CUDA compatibility
mp.set_start_method('spawn', force=True)
# Distribute work across GPUs
device_assignments = []
for i in range(len(chapter_data)):
device_id = i % self.num_gpus if self.num_gpus > 1 else None
device_assignments.append(device_id)
# Create worker pool and execute in parallel
with mp.Pool(processes=max(3, self.num_gpus)) as pool:
args = [(data, LLM_MODEL, device_id) for data, device_id in zip(chapter_data, device_assignments)]
parallel_results = pool.starmap(write_chapter_parallel, args)
# Sort results to maintain chapter order
chapters_draft = sorted(parallel_results,
key=lambda x: outline.index(next(ch for ch in outline if ch['title'] == x[0])))
# Step 4: Post-processing and compilation
chapters = []
for title, content, references in chapters_draft:
content = self._post_process_headings(content)
chapters.append((title, content, references))
# Final compilation
logging.info("Compiling all chapters into final book")
book_content = self.editor_agent.compile_book(chapters, outline)
book_content = self._post_process_headings(book_content)
# Save the final masterpiece
output_path = os.path.join(OUTPUT_DIR, "ml_book_parallel.md")
with open(output_path, "w", encoding="utf-8") as f:
f.write(book_content)
logging.info(f"Parallel book creation complete! Saved to {output_path}")
return output_path, research_csv_path
Parallel Processing Explained:
If the regular BookProject is like having one incredibly skilled chef carefully preparing each dish, then ParallelBookProject is like transforming that kitchen into a coordinated restaurant brigade where multiple master chefs work simultaneously - each with their own station, their own equipment, but all creating the same incredible meal!
Buckle up, because we're about to turbocharge your AI publishing empire! 🚀
🎭 The Inheritance Magic: Standing on the Shoulders of Giants
class ParallelBookProject(BookProject):
"""Enhanced BookProject with parallel processing capabilities."""
Imagine you inherited your family's successful restaurant. You love everything about how it works, but you have ONE brilliant idea: "What if we could serve multiple tables simultaneously instead of one at a time?"
That's exactly what inheritance does here! We're saying:
"Keep everything that works" (all the methods from BookProject)
"But let me add my secret sauce" (parallel processing superpowers)
"Same great taste, just faster service!" ✨
Pro Tip: 💡 This is why inheritance is so powerful - you get to be lazy in the best way possible! Instead of rewriting hundreds of lines of working code, you just say "like that, but with this cool upgrade!"
Gotcha Alert: 🚨 The beauty of this approach is that if someone improves the base BookProject class (maybe they make the research faster), our parallel version automatically gets those improvements too. It's like having a restaurant that automatically gets better whenever the head chef learns a new technique!
🚀 Smart GPU Initialization: The Hardware Whisperer
def init(self, num_gpus=1):
"""Initialize with GPU count specification."""
super().__init__()
self.num_gpus = min(num_gpus, torch.cuda.device_count() if torch.cuda.is_available() else 1)
logging.info(f"Initialized ParallelBookProject with {self.num_gpus} GPU(s)")
The Super() Call: Respecting Your Elders 🙏
super().__init__()
What's happening: We're calling the parent class's initialization first, like saying "Hey mom and dad, set up everything you know how to do, then we will add my special touches!"
Why it's brilliant: This ensures we get ALL the foundational stuff (RAG system, agents, etc.) before we start messing with parallel processing. It's like inheriting a fully operational restaurant before you start rearranging the kitchen for maximum efficiency!
Like moving into your parents' house and saying "Keep all the furniture, I just want to add a gaming room!"
The GPU Reality Check: Never Promise What You Can't Deliver 🎯
self.num_gpus = min(num_gpus, torch.cuda.device_count() if torch.cuda.is_available() else 1)
This line is pure genius wrapped in simplicity! Let's unwrap it like a present:
Step 1: Do we even have GPU support?
torch.cuda.is_available()
Like calling a pizza place and asking "Do you guys actually deliver?" before placing an order!
Step 2: How many GPUs are actually there?
torch.cuda.device_count() if torch.cuda.is_available() else 1
"If you have lanes available, count them. If you don't have a bowling alley, let's just say you have 1 (we'll figure it out later)."
Step 3: The Politeness Protocol
min(num_gpus, available_count)
You call and say "Table for 10 please!" But they only have space for 6. A good restaurant says "I can seat 6 of you right now!" instead of hanging up. Same energy here!
Fun Examples:
The Optimist: "I want 8 GPUs!" → Reality: 2 GPUs → Gets: 2 GPUs ✅
The Realist: "I want 1 GPU" → Reality: 4 GPUs → Gets: 1 GPU ✅
The Dreamer: "I want 4 GPUs" → Reality: 0 GPUs → Gets: 0 GPUs (falls back gracefully) ✅
Pro Tip: 💡 Notice how this NEVER crashes? That's the hallmark of production-ready code - it adapts to reality instead of demanding reality adapt to it!
🎪 The Run Method: Where the Magic Happens
def run(self):
"""Execute with parallel chapter writing."""
logging.info("Starting parallel book writing process")
# Steps 1-2: Same as sequential (outline and research)
outline = self.outline_agent.create_outline()
Smart Design Decision Alert: 🧠 Notice how we keep the outline and research phases sequential? That's not laziness - that's strategic brilliance!
Why keep some parts sequential?
Outline creation: Super fast anyway, and everything else depends on it
Research benefits from shared context: Finding related info across all chapters is more efficient together
Avoiding premature optimization: Don't parallelize what doesn't need it!
Like building a house - you need the blueprint first (outline), then materials delivered (research), THEN multiple crews can work on different rooms simultaneously!
The Research Phase: Same Recipe, Same Great Results 📚
# Research phase (same as sequential)
all_research = {}
research_csv_path = os.path.join(OUTPUT_DIR, "research_results.csv")
with open(research_csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Chapter', 'Document Number', 'Source', 'Content'])
for chapter_info in outline:
logging.info(f"Researching for chapter: {chapter_info['title']}")
research_docs = self.researcher_agent.research(chapter_info)
all_research[chapter_info['title']] = research_docs
self.save_research_results_to_csv(research_docs, chapter_info['title'], research_csv_path)
Like a master librarian who gathers ALL the books for a research project at once, organizes them perfectly, and creates a detailed catalog. The parallel magic comes AFTER all the prep work is done!
Why this stays sequential:
Research is I/O bound (reading files) not compute bound (AI generation)
Shared context is valuable - finding related documents across chapters
The audit trail (CSV) stays perfectly organized
Memory efficiency - load research once, use multiple times
Trivia Time: 🤓 This is a classic example of "optimization where it matters" - we parallelize the slow, compute-heavy stuff (AI generation) but keep the fast, I/O stuff sequential for simplicity!
⚡ The Parallel Processing
Step 1: Data Prep for the Big Show 🎬
# Parallel chapter writing magic!
chapter_data = []
for chapter_info in outline:
research_docs = all_research[chapter_info['title']]
chapter_data.append((chapter_info, research_docs))
Like preparing individual meal kits for each chef - every package contains exactly what they need to create their assigned dish, with no confusion about ingredients!
Data Structure Genius: Each tuple contains:
The recipe (chapter_info - title and description)
The ingredients (research_docs - all the source materials)
Why tuples? They're immutable and serializable - perfect for sending across process boundaries! It's like vacuum-sealing each meal kit so nothing gets contaminated during transport!
Example structure:
chapter_data = [
({'title': 'What is ML', 'description': '...'}, [doc1, doc2, doc3]),
({'title': 'Supervised Learning', 'description': '...'}, [doc4, doc5, doc6]),
({'title': 'Unsupervised Learning', 'description': '...'}, [doc7, doc8, doc9])
]
Step 2: The CUDA Compatibility Spell 🪄
logging.info("Starting parallel chapter writing")
# Configure multiprocessing for CUDA compatibility
mp.set_start_method('spawn', force=True)
This line is ABSOLUTELY CRITICAL! 🚨 Let me tell you why...
The Problem: GPU memory is like a very territorial cat 🐱 - it doesn't like sharing with strangers!
Two Process Creation Methods:
'fork' (default on Linux/Mac): Like making a photocopy of your entire workspace - fast but messy with GPUs. Like cloning a worker mid-task - they remember everything but might conflict over machines
'spawn' (default on Windows): Like giving each worker a fresh, clean workspace - slower to start but GPU-safe. Like hiring fresh workers and training them from scratch - clean slate, no conflicts
Why force=True? Sometimes the start method was already set elsewhere. force=True is like saying "I don't care what was decided before, THIS is how we're doing it now!" 💪
Without this line, you'll get mysterious errors like "CUDA context already initialized" or random crashes that make NO sense. This one line has saved countless hours of debugging!
Step 3: The GPU Round-Robin Dance 💃
# Distribute work across GPUs
device_assignments = []
for i in range(len(chapter_data)):
device_id = i % self.num_gpus if self.num_gpus > 1 else None
device_assignments.append(device_id)
This is like being a super-fair kindergarten teacher dividing kids into groups for activities! 👩🏫
How the round-robin works:
Chapter 0: 0 % 2 = 0 → GPU 0
Chapter 1: 1 % 2 = 1 → GPU 1
Chapter 2: 2 % 2 = 0 → GPU 0 (back to the beginning!)
Chapter 3: 3 % 2 = 1 → GPU 1
Like having 2 washing machines and 4 loads of laundry - you alternate between machines so they're both always busy!
The if self.num_gpus > 1 else None Logic:
Multiple GPUs: Assign specific IDs for perfect distribution
Single GPU: Use None and let the system auto-assign (simpler and safer)
Pro Tip: 💡 This prevents the classic "GPU 0 is overloaded while GPU 1 sits idle" problem. Fair is fast!
Step 4: The Worker Pool Magic Show 🎪
# Create worker pool and execute in parallel
with mp.Pool(processes=max(3, self.num_gpus)) as pool:
args = [(data, LLM_MODEL, device_id) for data, device_id in zip(chapter_data, device_assignments)]
parallel_results = pool.starmap(write_chapter_parallel, args)
This is where the rubber meets the road! 🏎️
Pool Size Logic: The Goldilocks Principle
processes=max(3, self.num_gpus)
Why max(3, self.num_gpus)? It's like having a minimum crew size policy!
1 GPU system: Gets 3 processes (some can share GPU efficiently during I/O waits)
4 GPU system: Gets 4 processes (one per GPU - perfect!)
8 GPU system: Gets 8 processes (maximum parallelism!)
Even if you only have 1 espresso machine, you still want 3 baristas - one making coffee, one taking orders, one handling pastries!
Argument Preparation: The Assembly Line
args = [(data, LLM_MODEL, device_id) for data, device_id in zip(chapter_data, device_assignments)]
What zip() does: Like having two conveyor belts moving in sync - one with chapter data, one with GPU assignments - and you're packaging them together!
Example result:
args = [
((chapter1_info, research1_docs), "TinyLlama/TinyLlama-1.1B-Chat-v1.0", 0),
((chapter2_info, research2_docs), "TinyLlama/TinyLlama-1.1B-Chat-v1.0", 1),
((chapter3_info, research3_docs), "TinyLlama/TinyLlama-1.1B-Chat-v1.0", 0)
]
The Parallel Execution Moment
parallel_results = pool.starmap(write_chapter_parallel, args)
starmap vs map:
map: Calls function with ONE argument per call
starmap: Unpacks each tuple as SEPARATE arguments
🎬 Like having multiple film crews shooting different scenes simultaneously - each crew has their script, equipment, and assigned sound stage!
What's happening behind the scenes:
# Process 1: write_chapter_parallel((chapter1_info, docs), model, gpu_0)
# Process 2: write_chapter_parallel((chapter2_info, docs), model, gpu_1)
# Process 3: write_chapter_parallel((chapter3_info, docs), model, gpu_0)
# All running AT THE SAME TIME! 🤯
🔄 The Results Wrangling: Herding Cats 🐱
The Ordering Challenge: When Speed Creates Chaos
# Sort results to maintain chapter order
chapters_draft = sorted(parallel_results,
key=lambda x: outline.index(next(ch for ch in outline if ch['title'] == x[0])))
The Problem: Parallel processes finish in random order! Chapter 3 might finish before Chapter 1, creating chaos! 😵
🏁 Imagine a race where runners finish in different order, but you need to organize the results by their starting lane numbers for the official standings!
The Solution Breakdown:
x[0] gets the chapter title from each result
next(ch for ch in outline if ch['title'] == x[0]) finds the matching chapter in original outline
outline.index(...) gets the position in the original order
sorted(..., key=...) reorganizes everything correctly
Example chaos → order:
# Parallel results (random finish order):
[("Unsupervised Learning", content3), ("What is ML", content1), ("Supervised Learning", content2)]
# After sorting (logical order restored):
[("What is ML", content1), ("Supervised Learning", content2), ("Unsupervised Learning", content3)]
Trivia Time: 🤓 This sorting logic is so elegant that senior developers often use it as an interview question! You just learned a pattern that impresses hiring managers!
Quality Control: No Shortcuts on Excellence ✨
# Step 4: Post-processing and compilation
chapters = []
for title, content, references in chapters_draft:
content = self._post_process_headings(content)
chapters.append((title, content, references))
The Quality Promise: Even though we're going fast, we never compromise on quality! Every chapter gets the same careful post-processing as the sequential version.
🍽️ Like having a head chef who always checks every dish before it goes out, regardless of whether one cook made it or five cooks worked on different parts!
🏆 The Grand Finale: Assembly and Victory
# Final compilation
logging.info("Compiling all chapters into final book")
book_content = self.editor_agent.compile_book(chapters, outline)
book_content = self._post_process_headings(book_content)
# Save the final masterpiece
output_path = os.path.join(OUTPUT_DIR, "ml_book_parallel.md")
with open(output_path, "w", encoding="utf-8") as f:
f.write(book_content)
logging.info(f"Parallel book creation complete! Saved to {output_path}")
return output_path, research_csv_path
The Victory Lap: 🎉 After all that parallel complexity, the ending is beautifully simple - same compilation, same quality checks, same great results!
Notice the filename: "ml_book_parallel.md" - Different from the sequential version so you can race them against each other and see the speed difference! 🏁
Part 5 is available at: https://www.codersarts.com/post/a-complete-guide-to-creating-a-multi-agent-book-writing-system-part-5
Transform Your Projects with Codersarts
Whether you're looking to implement RAG systems for your organization, need help with complex AI projects, or want to build custom multi-agent systems, the experts at Codersarts are here to help. From academic assignments to enterprise-level AI solutions, we provide:
Custom RAG Implementation: Tailored document processing and retrieval systems
Multi-Agent System Development: Complex AI workflows for your specific needs
AI Training & Consulting: Learn to build and deploy production-ready AI systems
Research Support: Get help with cutting-edge AI research and development
Don't let complex AI implementations slow down your innovation. Connect with Codersarts today and turn your AI ideas into reality!
Ready to get started? Visit Codersarts.com or reach out to our team to discuss your next AI project. The future of intelligent automation is here – let's build it together!

Comments