> ## Documentation Index
> Fetch the complete documentation index at: https://docs.tensorlake.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Parallel Sub-Agents

> Fan out work to specialist agents that run in parallel

Every agent framework has converged on the same pattern: break a complex task into independent subtasks, run specialist agents on each subtask in parallel, and synthesize the results. LangGraph does this with `Send` and `@task` futures. OpenAI Agents SDK uses `asyncio.gather` and `agent.as_tool()`. Claude Agent SDK spawns subagents via the `Task` tool. Deep Agents dispatches parallel `task` tool calls.

On Tensorlake, you get the same fan-out/fan-in pattern — but each sub-agent runs in its own container with dedicated resources, independent retries, and durable checkpointing. No `asyncio` plumbing, no graph DSL, no shared memory coordination.

## Basic Pattern: Fan-Out and Combine

Define each sub-agent as a `@function()`, create futures for each, and pass them to a combiner function as a tail call:

```python theme={null}
from tensorlake.applications import application, function, Image

research_image = Image().run("pip install openai requests")

@application()
@function()
def analyze_company(company_name: str) -> dict:
    # Fan out to specialist agents — all run in parallel
    financials = financial_agent.future(company_name)
    market = market_agent.future(company_name)
    sentiment = sentiment_agent.future(company_name)

    # Combine results — runs after all agents complete
    return compile_report.future(financials, market, sentiment, company_name)


@function(image=research_image, timeout=600)
def financial_agent(company: str) -> dict:
    """Analyze financial data for a company."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze financials for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function(image=research_image, timeout=600)
def market_agent(company: str) -> dict:
    """Analyze market position and competitors."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze market position for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function(image=research_image, timeout=600)
def sentiment_agent(company: str) -> dict:
    """Analyze public sentiment."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze sentiment for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function()
def compile_report(financials: dict, market: dict, sentiment: dict, company: str) -> dict:
    return {
        "company": company,
        "financials": financials,
        "market": market,
        "sentiment": sentiment
    }
```

**Execution flow:**

```mermaid theme={null}
graph LR
    A["analyze_company()"] --> B["financial_agent()"]
    A --> C["market_agent()"]
    A --> D["sentiment_agent()"]
    B --> E["compile_report()"]
    C --> E
    D --> E
    E --> F["result"]
```

## How It Works

1. The orchestrator function creates futures for each sub-agent — this defines the calls without running them
2. Futures are passed as arguments to the combiner function, which is returned as a **tail call**
3. Tensorlake detects that the future arguments have no dependencies on each other and runs all sub-agents **in parallel**
4. When all sub-agents complete, the combiner runs with their results
5. The orchestrator's container is freed immediately after returning the tail call

<Tip>The orchestrator's container is freed immediately after returning the tail call. You're not paying for an idle container while sub-agents work.</Tip>

## Real-World Patterns

These patterns are inspired by what teams are building in production with LangGraph, OpenAI Agents SDK, Claude Agent SDK, and Deep Agents — reimplemented on Tensorlake with container isolation, independent scaling, and durable execution.

### Parallel Research with Synthesis

The most common multi-agent pattern across every framework: decompose a research question into subtopics, investigate each in parallel, and synthesize the findings. This is the pattern behind GPT Researcher, Exa's web research system, and Anthropic's multi-agent research system.

```python theme={null}
from tensorlake.applications import application, function, Image

research_image = Image().run("pip install openai requests beautifulsoup4")

@function(image=research_image, timeout=900, retries=2)
def research_subtopic(topic: str, subtopic: str) -> dict:
    """Each researcher runs in its own container, searches the web,
    reads sources, and produces a structured summary."""
    from openai import OpenAI
    client = OpenAI(max_retries=0)

    # Step 1: Generate search queries for this subtopic
    queries = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Generate 3 search queries to research '{subtopic}' in the context of '{topic}'."}],
    ).choices[0].message.content

    # Step 2: Search and gather sources
    sources = search_and_read(queries)

    # Step 3: Analyze and summarize
    analysis = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarize research findings with citations."},
            {"role": "user", "content": f"Topic: {subtopic}\n\nSources:\n{sources}"},
        ],
    ).choices[0].message.content

    return {"subtopic": subtopic, "analysis": analysis, "source_count": len(sources)}


@function(image=research_image, timeout=300)
def synthesize_research(results: list[dict], topic: str) -> dict:
    """Combine all parallel research into a cohesive report."""
    from openai import OpenAI
    combined = "\n\n---\n\n".join(
        f"## {r['subtopic']}\n{r['analysis']}" for r in results
    )
    report = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Synthesize research findings into a cohesive report. Resolve contradictions and highlight consensus."},
            {"role": "user", "content": f"Topic: {topic}\n\nFindings:\n{combined}"},
        ],
    ).choices[0].message.content

    return {"topic": topic, "report": report, "sections": len(results)}


@application()
@function(image=research_image, timeout=120)
def deep_research(topic: str) -> dict:
    """Orchestrator: decompose, fan out, synthesize."""
    from openai import OpenAI
    import json

    # Plan the research
    plan = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Break this topic into 3-5 independent research subtopics: {topic}"}],
        response_format={"type": "json_object"},
    ).choices[0].message.content
    subtopics = json.loads(plan)["subtopics"]

    # Fan out — each subtopic researched in parallel
    findings = [research_subtopic.future(topic, sub) for sub in subtopics]

    # Synthesize — runs after all research completes
    return synthesize_research.future(findings, topic)
```

Each researcher runs in its own container with its own 15-minute timeout and 2 retries. If one subtopic's research fails (rate limit, network error), only that subtopic is retried — the other researchers' work is preserved.

### Multi-Perspective Analysis

Multiple specialist agents examine the same input from different analytical perspectives — a pattern used in production for investment analysis, proposal review, and compliance checks.

```python theme={null}
from pydantic import BaseModel
from tensorlake.applications import application, function, Image

analyst_image = Image().run("pip install anthropic")


class AnalystReport(BaseModel):
    perspective: str
    assessment: str
    risk_score: float
    key_findings: list[str]


@function(image=analyst_image, timeout=600, retries=2)
def growth_analyst(company_data: dict) -> AnalystReport:
    """Evaluate revenue growth, market expansion, and competitive moats."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a growth analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("growth", response.content[0].text)


@function(image=analyst_image, timeout=600, retries=2)
def value_analyst(company_data: dict) -> AnalystReport:
    """Evaluate cash flow, margins, and intrinsic value."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a value analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("value", response.content[0].text)


@function(image=analyst_image, timeout=600, retries=2)
def risk_analyst(company_data: dict) -> AnalystReport:
    """Evaluate regulatory risk, market volatility, and operational risk."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a risk analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("risk", response.content[0].text)


@function(image=analyst_image, timeout=300)
def investment_committee(growth: AnalystReport, value: AnalystReport, risk: AnalystReport) -> dict:
    """Weigh all perspectives and produce a final recommendation."""
    import anthropic
    client = anthropic.Anthropic()
    combined = f"Growth: {growth.model_dump()}\nValue: {value.model_dump()}\nRisk: {risk.model_dump()}"
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As an investment committee, synthesize these analyst reports into a buy/hold/sell recommendation:\n{combined}"}],
    )
    return {"recommendation": response.content[0].text, "analyst_reports": [growth.model_dump(), value.model_dump(), risk.model_dump()]}


@application()
@function()
def analyze_investment(company_data: dict) -> dict:
    growth = growth_analyst.future(company_data)
    value = value_analyst.future(company_data)
    risk = risk_analyst.future(company_data)
    return investment_committee.future(growth, value, risk)
```

This mirrors the multi-agent portfolio collaboration pattern from the OpenAI Agents SDK cookbook — but each analyst runs in an isolated container with its own timeout and retry policy.

### Document Processing Pipeline

Process a batch of documents through parallel specialist agents — a common pattern for intake automation in insurance, legal, and financial services.

```python theme={null}
from tensorlake.applications import application, function, Image

ocr_image = Image().run("pip install pytesseract pillow pdf2image")
llm_image = Image().run("pip install openai")

@function(image=ocr_image, cpu=2, memory=4, timeout=120)
def extract_text(doc_url: str) -> dict:
    """OCR and text extraction — needs CPU for image processing."""
    content = download_and_ocr(doc_url)
    return {"url": doc_url, "text": content}

@function(image=llm_image, timeout=300, retries=2)
def classify_document(doc: dict) -> dict:
    """Determine document type and extract key fields."""
    from openai import OpenAI
    response = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Classify this document and extract key fields:\n{doc['text'][:4000]}"}],
        response_format={"type": "json_object"},
    )
    return {**doc, "classification": response.choices[0].message.content}

@function(image=llm_image, timeout=300, retries=2)
def check_compliance(doc: dict) -> dict:
    """Check for missing signatures, dates, required fields."""
    from openai import OpenAI
    response = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Check this document for compliance issues:\n{doc['text'][:4000]}"}],
        response_format={"type": "json_object"},
    )
    return {**doc, "compliance": response.choices[0].message.content}

@function(timeout=60)
def merge_results(classified: dict, compliance: dict) -> dict:
    return {
        "url": classified["url"],
        "classification": classified["classification"],
        "compliance": compliance["compliance"],
    }

@application()
@function()
def process_document(doc_url: str) -> dict:
    extracted = extract_text.future(doc_url)
    classified = classify_document.future(extracted)
    compliance = check_compliance.future(extracted)
    return merge_results.future(classified, compliance)
```

```mermaid theme={null}
graph LR
    A["process_document()"] --> B["extract_text()"]
    B --> C["classify_document()"]
    B --> D["check_compliance()"]
    C --> E["merge_results()"]
    D --> E
    E --> F["result"]
```

After extraction, classification and compliance checking run in parallel — they both depend on the extracted text but not on each other. If the compliance check hits a rate limit, it retries independently without re-running OCR or classification.

## Use Any Agent Framework

Each sub-agent can use whatever framework you want internally. The `@function()` boundary is a container boundary — what runs inside is up to you. Define each specialist as a focused function using its framework, then fan them out with `.future()`.

```python theme={null}
from tensorlake.applications import application, function, Image

# Each framework gets its own container image with its own dependencies
langgraph_image = Image().run("pip install langgraph langchain-openai tavily-python")
openai_image = Image().run("pip install openai-agents")
claude_image = Image().run("pip install claude-agent-sdk")
deep_image = Image().run("pip install deepagents langchain-openai")


@function(image=langgraph_image, timeout=600)
def market_researcher(company: str) -> str:
    """Market research using a LangGraph ReAct agent with web search."""
    from langgraph.prebuilt import create_react_agent
    from langchain_openai import ChatOpenAI
    from langchain_community.tools import TavilySearchResults

    agent = create_react_agent(
        ChatOpenAI(model="gpt-4o"),
        tools=[TavilySearchResults(max_results=5)],
    )
    result = agent.invoke({"messages": [
        ("human", f"Research the market position, competitors, and recent news for {company}.")
    ]})
    return result["messages"][-1].content


@function(image=openai_image, timeout=600)
def financial_analyst(company: str) -> str:
    """Financial analysis using an OpenAI Agents SDK agent with tool use."""
    from agents import Agent, Runner, WebSearchTool

    agent = Agent(
        name="FinancialAnalyst",
        instructions=(
            "You are a financial analyst. Analyze revenue, margins, cash flow, "
            "and valuation metrics. Use web search to find the latest filings."
        ),
        tools=[WebSearchTool()],
    )
    result = Runner.run_sync(agent, f"Analyze the financials for {company}")
    return result.final_output


@function(image=claude_image, timeout=900, ephemeral_disk=4)
def risk_assessor(company: str) -> str:
    """Risk assessment using a Claude agent with deep reasoning."""
    import asyncio
    from claude_agent_sdk import query, ClaudeAgentOptions

    async def run():
        result = ""
        async for message in query(
            prompt=f"Assess regulatory, operational, and market risks for {company}.",
            options=ClaudeAgentOptions(
                system_prompt="You are a risk analyst. Identify and score key risks.",
                permission_mode="acceptEdits",
                cwd="/tmp/workspace",
            ),
        ):
            result = str(message)
        return result

    return asyncio.run(run())


@function(image=deep_image, timeout=900)
def technical_reviewer(company: str) -> str:
    """Technical deep-dive using a Deep Agent with planning and web search."""
    from deepagents import create_deep_agent

    agent = create_deep_agent(
        model="openai:gpt-4o",
        system_prompt="Evaluate the company's technology stack, patents, and engineering culture.",
    )
    result = agent.invoke({
        "messages": [{"role": "user", "content": f"Technical review of {company}"}]
    })
    return result["messages"][-1].content


@function(timeout=300)
def compile_analysis(market: str, financials: str, risks: str, technical: str, company: str) -> dict:
    """Combine all analyst reports into a final recommendation."""
    return {
        "company": company,
        "market_research": market,
        "financial_analysis": financials,
        "risk_assessment": risks,
        "technical_review": technical,
    }


@application()
@function()
def analyze_company(company: str) -> dict:
    # Four frameworks, four containers, all running in parallel
    market = market_researcher.future(company)
    financials = financial_analyst.future(company)
    risks = risk_assessor.future(company)
    technical = technical_reviewer.future(company)

    return compile_analysis.future(market, financials, risks, technical, company)
```

```mermaid theme={null}
graph LR
    A["analyze_company()"] --> B["market_researcher()\nLangGraph"]
    A --> C["financial_analyst()\nOpenAI Agents SDK"]
    A --> D["risk_assessor()\nClaude Agent SDK"]
    A --> E["technical_reviewer()\nDeep Agents"]
    B --> F["compile_analysis()"]
    C --> F
    D --> F
    E --> F
    F --> G["result"]
```

Each agent runs in its own container with its own dependencies — no version conflicts, no shared memory, no `asyncio` event loop contention. If the risk assessment takes longer than the others, the completed agents' results are checkpointed and preserved.

## Different Resources Per Agent

Each sub-agent can have its own container configuration:

```python theme={null}
gpu_image = Image().run("pip install torch transformers")

@function(cpu=1, memory=2, timeout=300)
def text_agent(prompt: str) -> str:
    """Lightweight text analysis."""
    ...

@function(image=gpu_image, cpu=4, memory=16, gpu="T4", timeout=600)
def vision_agent(image_url: str) -> dict:
    """GPU-heavy image analysis."""
    ...

@function(cpu=2, memory=4, timeout=900)
def data_agent(query: str) -> list:
    """Medium resources for data fetching."""
    ...

@application()
@function()
def multimodal_analysis(prompt: str, image_url: str) -> dict:
    text_result = text_agent.future(prompt)
    vision_result = vision_agent.future(image_url)
    data_result = data_agent.future(prompt)
    return combine_results.future(text_result, vision_result, data_result)
```

## Chaining Parallel Stages

You can chain stages where each stage fans out in parallel:

```python theme={null}
@application()
@function()
def pipeline(query: str) -> dict:
    # Stage 1: Gather data in parallel
    web = search_web.future(query)
    papers = search_papers.future(query)
    news = search_news.future(query)

    # Stage 2: Analyze each source (runs after stage 1)
    analysis = analyze_sources.future(web, papers, news)

    # Stage 3: Generate final output
    return generate_report.future(analysis, query)
```

Each stage waits for its dependencies automatically. Stages without dependencies run in parallel.

## Using Futures for More Control

When you need to do work in the orchestrator while sub-agents run, use [Futures](/applications/futures) instead of tail calls:

```python theme={null}
from tensorlake.applications import application, function, Future, RETURN_WHEN

@application()
@function(timeout=1800)
def interactive_analysis(query: str) -> dict:
    # Start sub-agents
    agent_a: Future = agent_a_work.future(query).run()
    agent_b: Future = agent_b_work.future(query).run()

    # Do local work while agents run
    local_context = prepare_context(query)

    # Wait for both agents
    Future.wait([agent_a, agent_b], return_when=RETURN_WHEN.ALL_COMPLETED)

    return {
        "context": local_context,
        "agent_a": agent_a.result(),
        "agent_b": agent_b.result()
    }
```

## Learn More

<CardGroup cols={2}>
  <Card title="Futures" icon="shuffle" href="/applications/futures">
    Deep dive on futures, tail calls, and parallel execution.
  </Card>

  <Card title="Async Functions" icon="shuffle" href="/applications/async-functions">
    Use Python async/await for parallel workflows.
  </Card>

  <Card title="Map-Reduce" icon="diagram-project" href="/applications/map-reduce">
    Parallel processing over lists of data.
  </Card>
</CardGroup>
