← Back to Patterns

Pattern 5: Parallelization

Learn the fan-out/fan-in pattern to run multiple AI agents concurrently, dramatically reducing latency while maintaining quality

Pattern 5: Parallelization

Learning Objectives

By the end of this tutorial, you will be able to:

  • Understand the fan-out/fan-in pattern and when to apply it
  • Implement parallel agent execution with WorkflowBuilder
  • Create the FanOutExecutor and FanInExecutor components
  • Build specialized agents with tool access
  • Aggregate results from multiple agents into a cohesive output

Prerequisites

Before starting this tutorial, ensure you have:

What You’ll Build

In this tutorial, you’ll build a parallel travel planning system where multiple specialized agents research different aspects of a trip simultaneously, then aggregate their findings into a comprehensive travel plan.

The Scenario

Imagine planning a trip to Paris. Instead of a single agent handling all research sequentially, you deploy a team of specialists:

flowchart TB
  Q["Plan a 5-day trip to Paris"] --> FO[Fan-Out]
  FO --> H["Hotels Agent
(with Google Search)"] FO --> T["Transport Agent
(with Google Search)"] FO --> A["Activities Agent
(with Google Search)"] H --> FI[Fan-In] T --> FI A --> FI FI --> R["Comprehensive
Travel Plan"] style Q fill:#8b5cf6,color:#fff style R fill:#3b82f6,color:#fff style FO fill:#14b8a6,color:#fff style FI fill:#14b8a6,color:#fff

Each agent searches the web in real-time, and the aggregator combines their research into a cohesive itinerary. The key insight: all three agents run simultaneously, reducing total latency by ~3x compared to sequential execution.

Key Learning Points

Building this system will teach you:

  1. Fan-Out Pattern - Broadcasting a query to multiple agents simultaneously
  2. Fan-In Pattern - Collecting and aggregating results from parallel tasks
  3. WorkflowBuilder API - Using AddFanOutEdge and AddFanInEdge for parallel orchestration
  4. Executor Components - Creating custom executors to control workflow behavior
  5. Latency Optimization - When parallel execution provides significant benefits

Pattern Overview

What is Parallelization?

Parallelization is running multiple agent tasks concurrently rather than sequentially. The pattern consists of two phases:

  1. Fan-Out: A single input is broadcast to multiple agents that execute in parallel
  2. Fan-In: Results from all parallel agents are collected and synthesized

This is particularly powerful when:

  • Tasks are independent (no dependencies between agents)
  • Each task involves I/O-bound operations (API calls, LLM inference)
  • Total latency matters more than individual task latency

When to Use Parallelization

Use parallel execution when:

  • Multi-source research (querying multiple data sources simultaneously)
  • Data aggregation (collecting and merging results from independent tasks)
  • Parallel API calls (calling multiple external services at once)
  • Independent subtasks (splitting work that doesn’t depend on each other)

Avoid parallel execution when:

  • Tasks have dependencies (Task B needs output from Task A)
  • Resources are constrained (shared rate limits, memory pressure)
  • Order matters (sequential processing is semantically required)

Sequential vs Parallel Comparison

flowchart LR
  subgraph Sequential ["Sequential (AddEdge)"]
      S1[Hotels] --> S2[Transport] --> S3[Activities] --> S4[Aggregate]
  end

  subgraph Parallel ["Parallel (AddFanOutEdge)"]
      P0[Fan-Out] --> P1[Hotels]
      P0 --> P2[Transport]
      P0 --> P3[Activities]
      P1 --> P4[Fan-In]
      P2 --> P4
      P3 --> P4
  end

With sequential execution, total latency is the sum of all agent latencies. With parallel execution, total latency is approximately the maximum of any single agent (plus overhead).

Step-by-Step Implementation

Step 1: Get the Code

git clone https://github.com/dotnetagents/patterns.git
cd patterns/05-parallelization/src/Parallelization

Step 2: Explore the Project Structure

Parallelization/
├── Services/
│   ├── IGoogleSearchService.cs    # Search abstraction
│   ├── GoogleSearchService.cs     # Real Google API
│   └── MockGoogleSearchService.cs # Mock for testing
├── Tools/
│   └── GoogleSearchTool.cs        # LLM-callable search tool
├── UseCases/
│   └── TravelPlanning/
│       ├── TravelPlanningConfig.cs    # Configuration
│       ├── TravelAgentFactory.cs      # Creates specialized agents
│       ├── FanOutExecutor.cs          # Broadcasts to parallel agents
│       ├── FanInExecutor.cs           # Collects and synthesizes
│       ├── ParallelTravelPipeline.cs  # Parallel workflow
│       ├── SequentialTravelPipeline.cs # Baseline comparison
│       └── ParallelizationBenchmarks.cs
└── Program.cs

Step 3: Create Specialized Agents

Each parallel agent has a focused role and access to Google Search. The TravelAgentFactory creates these agents:

public static class TravelAgentFactory
{
    public const string HotelsAgentName = "HotelsAgent";
    public const string TransportAgentName = "TransportAgent";
    public const string ActivitiesAgentName = "ActivitiesAgent";

    public static readonly string[] ExpectedAgentNames =
        [HotelsAgentName, TransportAgentName, ActivitiesAgentName];

    private const string HotelsSystemPrompt = """
        You are a specialized travel agent focused on finding the best hotels.

        IMPORTANT: You MUST use the Search tool to find current hotel information.

        When given a travel query:
        1. FIRST, use the Search tool to find hotels in the destination
        2. Search for specific hotel types based on the traveler's needs
        3. Use multiple searches if needed for different hotel categories

        After searching, provide recommendations including:
        - 3-5 specific hotel recommendations with names and prices
        - Key features and amenities
        - Location benefits
        - Booking tips with links from search results
        """;

    // Similar prompts for Transport and Activities agents...

    public static ChatClientAgent CreateHotelsAgent(
        string provider, string model, IGoogleSearchService searchService)
        => Create(provider, model, HotelsSystemPrompt, HotelsAgentName, searchService);
}

Step 4: Wire Up Tools

Each agent gets the Google Search tool using AIFunctionFactory:

internal static ChatClientAgent Create(
    string provider,
    string model,
    string instructions,
    string name,
    IGoogleSearchService searchService)
{
    // Create the search tool
    var searchTool = new GoogleSearchTool(searchService);
    var tools = new List<AITool> { AIFunctionFactory.Create(searchTool.Search) };

    // Build client with automatic function invocation
    var chatClient = new ChatClientBuilder(ChatClientFactory.Create(provider, model))
        .UseFunctionInvocation()
        .Build();

    return new ChatClientAgent(chatClient, instructions, name, tools: tools);
}

The GoogleSearchTool wraps the search service as an LLM-callable function:

public class GoogleSearchTool(IGoogleSearchService searchService)
{
    [Description("Search the web for information using Google. Returns relevant " +
                 "search results with titles, descriptions, and links.")]
    public async Task<string> Search(
        [Description("The search query to find relevant information")] string query,
        [Description("Maximum number of results (default: 5, max: 10)")] int maxResults = 5)
    {
        var results = await searchService.SearchAsync(query, Math.Min(maxResults, 10));

        if (results.Count == 0)
            return "No search results found for the query.";

        var formattedResults = results.Select((r, i) =>
            $"{i + 1}. **{r.Title}**\n   {r.Snippet}\n   Link: {r.Link}");

        return string.Join("\n\n", formattedResults);
    }
}

Step 5: Build the FanOutExecutor

The FanOutExecutor receives the user’s query and broadcasts it to all connected parallel agents:

internal sealed class FanOutExecutor() : Executor<ChatMessage>("FanOutExecutor")
{
    public override async ValueTask HandleAsync(
        ChatMessage message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // Broadcast the query to all connected agents (Hotels, Transport, Activities)
        await context.SendMessageAsync(message, cancellationToken);

        // Send turn token to kick off all parallel agents simultaneously
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

The TurnToken triggers all fan-out targets to begin processing at the same time.

Step 6: Build the FanInExecutor

The FanInExecutor collects results from all parallel agents and synthesizes them:

internal sealed class FanInExecutor : Executor<List<ChatMessage>, ChatMessage>
{
    private readonly Dictionary<string, ChatMessage> _agentResults = new();
    private readonly IChatClient _chatClient;
    private readonly HashSet<string> _expectedAgents;

    private const string SynthesisSystemPrompt = """
        You are an expert travel planner who creates comprehensive travel itineraries.

        You will receive research from three specialized agents:
        1. Hotels Agent - with accommodation recommendations
        2. Transport Agent - with flight and transportation options
        3. Activities Agent - with things to do and day plans

        Synthesize all research into a single, well-organized travel plan that:
        - Presents a coherent narrative, not just bullet points
        - Resolves any conflicts between recommendations
        - Creates a realistic, day-by-day itinerary
        - Includes practical logistics
        - Provides a budget estimate
        """;

    public override async ValueTask<ChatMessage> HandleAsync(
        List<ChatMessage> messages,
        IWorkflowContext context,
        CancellationToken cancellationToken = default)
    {
        // Collect results by agent name, filtering out tool call messages
        foreach (var msg in messages)
        {
            var agentName = msg.AuthorName;
            var hasToolContent = msg.Contents
                .Any(c => c is FunctionCallContent or FunctionResultContent);

            // Skip tool call messages and empty responses
            if (string.IsNullOrEmpty(agentName) || hasToolContent ||
                string.IsNullOrEmpty(msg.Text))
                continue;

            if (_expectedAgents.Contains(agentName))
                _agentResults[agentName] = msg;
        }

        // Wait until ALL expected agents have responded
        if (!_expectedAgents.All(a => _agentResults.ContainsKey(a)))
            return null!;  // Keep waiting for more results

        // All agents responded - synthesize into final plan
        var researchSummary = string.Join("\n\n---\n\n",
            _agentResults.Values.Select(m =>
                $"## Research from {m.AuthorName}\n\n{m.Text}"));

        var synthesisPrompt = $"""
            Synthesize this travel research into a comprehensive travel plan:

            {researchSummary}

            Create a cohesive itinerary combining the best recommendations.
            """;

        var chatMessages = new List<ChatMessage>
        {
            new(ChatRole.System, SynthesisSystemPrompt),
            new(ChatRole.User, synthesisPrompt)
        };

        var response = await _chatClient.GetResponseAsync(
            chatMessages, cancellationToken: cancellationToken);

        return new ChatMessage(ChatRole.Assistant, response.Text ?? string.Empty)
        {
            AuthorName = "Aggregator"
        };
    }
}

Key design decisions:

  • Filtering tool messages: Tool call/result messages are skipped; only final agent outputs are collected
  • Waiting for all agents: The executor returns null until all expected agents have responded
  • Synthesis prompt: Combines all research and asks the aggregator LLM to create a unified plan

Step 7: Assemble the Parallel Workflow

The ParallelTravelPipeline wires everything together using WorkflowBuilder:

public static class ParallelTravelPipeline
{
    public static (Workflow Workflow, Dictionary<string, string> AgentModels) Create(
        TravelPlanningConfig config,
        IGoogleSearchService searchService)
    {
        // Create specialized agents
        var hotelsAgent = TravelAgentFactory.CreateHotelsAgent(
            config.Provider, config.HotelsModel, searchService);
        var transportAgent = TravelAgentFactory.CreateTransportAgent(
            config.Provider, config.TransportModel, searchService);
        var activitiesAgent = TravelAgentFactory.CreateActivitiesAgent(
            config.Provider, config.ActivitiesModel, searchService);

        // Create executors
        var fanOutExecutor = new FanOutExecutor();
        var fanInExecutor = new FanInExecutor(
            ChatClientFactory.Create(config.Provider, config.AggregatorModel),
            config.AggregatorModel,
            TravelAgentFactory.ExpectedAgentNames);

        // Build workflow with Fan-Out/Fan-In pattern
        var workflow = new WorkflowBuilder(fanOutExecutor)
            .AddFanOutEdge(fanOutExecutor,
                targets: [hotelsAgent, transportAgent, activitiesAgent])
            .AddFanInEdge(
                [hotelsAgent, transportAgent, activitiesAgent],
                fanInExecutor)
            .WithOutputFrom(fanInExecutor)
            .Build();

        return (workflow, agentModels);
    }
}

The key API methods:

  • AddFanOutEdge(source, targets) - Routes output from source to multiple targets in parallel
  • AddFanInEdge(sources, target) - Collects outputs from multiple sources into a single target
  • WithOutputFrom(executor) - Designates which executor produces the final workflow output

Step 8: Run the Travel Planner

# Configure environment (see Setting Up the Agent Environment)
# Set AZURE_OPENAI_API_KEY and AZURE_OPENAI_ENDPOINT

# Run interactively
dotnet run

# Or run benchmarks
dotnet run -- --benchmark

Try queries like:

  • “Plan a 5-day trip to Paris in June”
  • “Plan a budget weekend getaway to Barcelona”
  • “Plan a family vacation to Tokyo with kids”

How It Works

When you submit “Plan a 5-day trip to Paris”, here’s the execution flow:

  1. FanOutExecutor receives the query and broadcasts it to all three agents
  2. Hotels, Transport, and Activities agents execute in parallel, each:
    • Calling Google Search multiple times
    • Analyzing results
    • Generating recommendations
  3. FanInExecutor collects results as they arrive, waiting for all three
  4. Once all agents respond, FanInExecutor synthesizes a comprehensive travel plan
  5. The final plan is returned to the user

The parallelization happens automatically via the workflow engine - you don’t manage threads or async coordination manually.

Sequential Baseline

For comparison, here’s how the sequential pipeline is structured:

// Sequential: Hotels → Transport → Activities → Aggregator
var workflow = new WorkflowBuilder(hotelsAgent)
    .AddEdge(hotelsAgent, transportAgent)
    .AddEdge(transportAgent, activitiesAgent)
    .AddEdge(activitiesAgent, aggregationExecutor)
    .WithOutputFrom(aggregationExecutor)
    .Build();

Each agent waits for the previous one to complete before starting. This is simpler but ~3x slower for I/O-bound tasks.

Common Pitfalls

1. Not Waiting for All Agents

The FanInExecutor must wait until all expected agents have responded before synthesizing. Returning early produces incomplete results.

// Correct: Wait for all agents
if (!_expectedAgents.All(a => _agentResults.ContainsKey(a)))
    return null!;  // Keep waiting

2. Forgetting to Filter Tool Messages

Agents using tools produce multiple messages (function calls, function results, final response). Only the final text response should be aggregated.

var hasToolContent = msg.Contents
    .Any(c => c is FunctionCallContent or FunctionResultContent);

if (hasToolContent)
    continue;  // Skip tool messages

3. Shared State Between Parallel Agents

Each parallel agent should be independent. Avoid sharing mutable state between agents, as execution order is non-deterministic.

4. Over-Parallelization

More parallel agents isn’t always better. Consider:

  • API rate limits (all agents hitting the same service)
  • Token costs (more agents = more LLM calls)
  • Aggregation complexity (synthesizing 10 agents is harder than 3)

5. Using Executors as Fan-In Sources

The AddFanInEdge method only works with ChatClientAgent instances, not Executor classes. If you try to use an Executor as a source for fan-in, the workflow won’t collect results properly.

// WRONG: Executors don't work as fan-in sources
var executor1 = new MyExecutor();
var executor2 = new MyExecutor();
workflow.AddFanInEdge([executor1, executor2], fanInExecutor);  // Won't work!

// CORRECT: Use ChatClientAgent instances
var agent1 = new ChatClientAgent(client, instructions, "Agent1");
var agent2 = new ChatClientAgent(client, instructions, "Agent2");
workflow.AddFanInEdge([agent1, agent2], fanInExecutor);  // Works!

This is why the travel planning example uses ChatClientAgent for all parallel tasks (Hotels, Transport, Activities) rather than custom Executor classes.

Try It Yourself

  1. Add a Budget Agent: Create a 4th agent that researches costs and provides budget breakdowns
  2. Implement Timeout Handling: Add a timeout to FanInExecutor that proceeds with partial results if an agent is slow
  3. Try Different Model Combinations: Use GPT-4o-mini for research agents and GPT-4 only for aggregation
  4. Add Error Recovery: Handle cases where an agent fails (e.g., search API down)

Summary

You learned how to:

  • Implement the fan-out/fan-in pattern for parallel agent execution
  • Build FanOutExecutor to broadcast queries to multiple agents
  • Build FanInExecutor to collect and synthesize results
  • Use AddFanOutEdge and AddFanInEdge with WorkflowBuilder
  • Create specialized agents with tool access for real-time research

The parallelization pattern is essential for I/O-bound multi-agent systems where latency matters.

Next Steps

Resources

Comments