Skip to main content
Every agent framework has converged on the same pattern: break a complex task into independent subtasks, run specialist agents on each subtask in parallel, and synthesize the results. LangGraph does this with Send and @task futures. OpenAI Agents SDK uses asyncio.gather and agent.as_tool(). Claude Agent SDK spawns subagents via the Task tool. Deep Agents dispatches parallel task tool calls. On Tensorlake, you get the same fan-out/fan-in pattern — but each sub-agent runs in its own container with dedicated resources, independent retries, and durable checkpointing. No asyncio plumbing, no graph DSL, no shared memory coordination.

Basic Pattern: Fan-Out and Combine

Define each sub-agent as a @function(), create awaitables for each, and pass them to a combiner function as a tail call:
from tensorlake.applications import application, function, Image

research_image = Image().run("pip install openai requests")

@application()
@function()
def analyze_company(company_name: str) -> dict:
    # Fan out to specialist agents — all run in parallel
    financials = financial_agent.awaitable(company_name)
    market = market_agent.awaitable(company_name)
    sentiment = sentiment_agent.awaitable(company_name)

    # Combine results — runs after all agents complete
    return compile_report.awaitable(financials, market, sentiment, company_name)


@function(image=research_image, timeout=600)
def financial_agent(company: str) -> dict:
    """Analyze financial data for a company."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze financials for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function(image=research_image, timeout=600)
def market_agent(company: str) -> dict:
    """Analyze market position and competitors."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze market position for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function(image=research_image, timeout=600)
def sentiment_agent(company: str) -> dict:
    """Analyze public sentiment."""
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": f"Analyze sentiment for {company}"}]
    )
    return {"analysis": response.choices[0].message.content}


@function()
def compile_report(financials: dict, market: dict, sentiment: dict, company: str) -> dict:
    return {
        "company": company,
        "financials": financials,
        "market": market,
        "sentiment": sentiment
    }
Execution flow:

How It Works

  1. The orchestrator function creates awaitables for each sub-agent — this defines the calls without running them
  2. Awaitables are passed as arguments to the combiner function, which is returned as a tail call
  3. Tensorlake detects that the awaitable arguments have no dependencies on each other and runs all sub-agents in parallel
  4. When all sub-agents complete, the combiner runs with their results
  5. The orchestrator’s container is freed immediately after returning the tail call
The orchestrator’s container is freed immediately after returning the tail call. You’re not paying for an idle container while sub-agents work.

Real-World Patterns

These patterns are inspired by what teams are building in production with LangGraph, OpenAI Agents SDK, Claude Agent SDK, and Deep Agents — reimplemented on Tensorlake with container isolation, independent scaling, and durable execution.

Parallel Research with Synthesis

The most common multi-agent pattern across every framework: decompose a research question into subtopics, investigate each in parallel, and synthesize the findings. This is the pattern behind GPT Researcher, Exa’s web research system, and Anthropic’s multi-agent research system.
from tensorlake.applications import application, function, Image

research_image = Image().run("pip install openai requests beautifulsoup4")

@function(image=research_image, timeout=900, retries=2)
def research_subtopic(topic: str, subtopic: str) -> dict:
    """Each researcher runs in its own container, searches the web,
    reads sources, and produces a structured summary."""
    from openai import OpenAI
    client = OpenAI(max_retries=0)

    # Step 1: Generate search queries for this subtopic
    queries = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Generate 3 search queries to research '{subtopic}' in the context of '{topic}'."}],
    ).choices[0].message.content

    # Step 2: Search and gather sources
    sources = search_and_read(queries)

    # Step 3: Analyze and summarize
    analysis = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Summarize research findings with citations."},
            {"role": "user", "content": f"Topic: {subtopic}\n\nSources:\n{sources}"},
        ],
    ).choices[0].message.content

    return {"subtopic": subtopic, "analysis": analysis, "source_count": len(sources)}


@function(image=research_image, timeout=300)
def synthesize_research(results: list[dict], topic: str) -> dict:
    """Combine all parallel research into a cohesive report."""
    from openai import OpenAI
    combined = "\n\n---\n\n".join(
        f"## {r['subtopic']}\n{r['analysis']}" for r in results
    )
    report = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Synthesize research findings into a cohesive report. Resolve contradictions and highlight consensus."},
            {"role": "user", "content": f"Topic: {topic}\n\nFindings:\n{combined}"},
        ],
    ).choices[0].message.content

    return {"topic": topic, "report": report, "sections": len(results)}


@application()
@function(image=research_image, timeout=120)
def deep_research(topic: str) -> dict:
    """Orchestrator: decompose, fan out, synthesize."""
    from openai import OpenAI
    import json

    # Plan the research
    plan = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Break this topic into 3-5 independent research subtopics: {topic}"}],
        response_format={"type": "json_object"},
    ).choices[0].message.content
    subtopics = json.loads(plan)["subtopics"]

    # Fan out — each subtopic researched in parallel
    findings = [research_subtopic.awaitable(topic, sub) for sub in subtopics]

    # Synthesize — runs after all research completes
    return synthesize_research.awaitable(findings, topic)
Each researcher runs in its own container with its own 15-minute timeout and 2 retries. If one subtopic’s research fails (rate limit, network error), only that subtopic is retried — the other researchers’ work is preserved.

Multi-Perspective Analysis

Multiple specialist agents examine the same input from different analytical perspectives — a pattern used in production for investment analysis, proposal review, and compliance checks.
from pydantic import BaseModel
from tensorlake.applications import application, function, Image

analyst_image = Image().run("pip install anthropic")


class AnalystReport(BaseModel):
    perspective: str
    assessment: str
    risk_score: float
    key_findings: list[str]


@function(image=analyst_image, timeout=600, retries=2)
def growth_analyst(company_data: dict) -> AnalystReport:
    """Evaluate revenue growth, market expansion, and competitive moats."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a growth analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("growth", response.content[0].text)


@function(image=analyst_image, timeout=600, retries=2)
def value_analyst(company_data: dict) -> AnalystReport:
    """Evaluate cash flow, margins, and intrinsic value."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a value analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("value", response.content[0].text)


@function(image=analyst_image, timeout=600, retries=2)
def risk_analyst(company_data: dict) -> AnalystReport:
    """Evaluate regulatory risk, market volatility, and operational risk."""
    import anthropic
    client = anthropic.Anthropic()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As a risk analyst, evaluate:\n{company_data}"}],
    )
    return parse_report("risk", response.content[0].text)


@function(image=analyst_image, timeout=300)
def investment_committee(growth: AnalystReport, value: AnalystReport, risk: AnalystReport) -> dict:
    """Weigh all perspectives and produce a final recommendation."""
    import anthropic
    client = anthropic.Anthropic()
    combined = f"Growth: {growth.model_dump()}\nValue: {value.model_dump()}\nRisk: {risk.model_dump()}"
    response = client.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2000,
        messages=[{"role": "user", "content": f"As an investment committee, synthesize these analyst reports into a buy/hold/sell recommendation:\n{combined}"}],
    )
    return {"recommendation": response.content[0].text, "analyst_reports": [growth.model_dump(), value.model_dump(), risk.model_dump()]}


@application()
@function()
def analyze_investment(company_data: dict) -> dict:
    growth = growth_analyst.awaitable(company_data)
    value = value_analyst.awaitable(company_data)
    risk = risk_analyst.awaitable(company_data)
    return investment_committee.awaitable(growth, value, risk)
This mirrors the multi-agent portfolio collaboration pattern from the OpenAI Agents SDK cookbook — but each analyst runs in an isolated container with its own timeout and retry policy.

Document Processing Pipeline

Process a batch of documents through parallel specialist agents — a common pattern for intake automation in insurance, legal, and financial services.
from tensorlake.applications import application, function, Image

ocr_image = Image().run("pip install pytesseract pillow pdf2image")
llm_image = Image().run("pip install openai")

@function(image=ocr_image, cpu=2, memory=4, timeout=120)
def extract_text(doc_url: str) -> dict:
    """OCR and text extraction — needs CPU for image processing."""
    content = download_and_ocr(doc_url)
    return {"url": doc_url, "text": content}

@function(image=llm_image, timeout=300, retries=2)
def classify_document(doc: dict) -> dict:
    """Determine document type and extract key fields."""
    from openai import OpenAI
    response = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Classify this document and extract key fields:\n{doc['text'][:4000]}"}],
        response_format={"type": "json_object"},
    )
    return {**doc, "classification": response.choices[0].message.content}

@function(image=llm_image, timeout=300, retries=2)
def check_compliance(doc: dict) -> dict:
    """Check for missing signatures, dates, required fields."""
    from openai import OpenAI
    response = OpenAI(max_retries=0).chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Check this document for compliance issues:\n{doc['text'][:4000]}"}],
        response_format={"type": "json_object"},
    )
    return {**doc, "compliance": response.choices[0].message.content}

@function(timeout=60)
def merge_results(classified: dict, compliance: dict) -> dict:
    return {
        "url": classified["url"],
        "classification": classified["classification"],
        "compliance": compliance["compliance"],
    }

@application()
@function()
def process_document(doc_url: str) -> dict:
    extracted = extract_text.awaitable(doc_url)
    classified = classify_document.awaitable(extracted)
    compliance = check_compliance.awaitable(extracted)
    return merge_results.awaitable(classified, compliance)
After extraction, classification and compliance checking run in parallel — they both depend on the extracted text but not on each other. If the compliance check hits a rate limit, it retries independently without re-running OCR or classification.

Use Any Agent Framework

Each sub-agent can use whatever framework you want internally. The @function() boundary is a container boundary — what runs inside is up to you. Define each specialist as a focused function using its framework, then fan them out with .awaitable().
from tensorlake.applications import application, function, Image

# Each framework gets its own container image with its own dependencies
langgraph_image = Image().run("pip install langgraph langchain-openai tavily-python")
openai_image = Image().run("pip install openai-agents")
claude_image = Image().run("pip install claude-agent-sdk")
deep_image = Image().run("pip install deepagents langchain-openai")


@function(image=langgraph_image, timeout=600)
def market_researcher(company: str) -> str:
    """Market research using a LangGraph ReAct agent with web search."""
    from langgraph.prebuilt import create_react_agent
    from langchain_openai import ChatOpenAI
    from langchain_community.tools import TavilySearchResults

    agent = create_react_agent(
        ChatOpenAI(model="gpt-4o"),
        tools=[TavilySearchResults(max_results=5)],
    )
    result = agent.invoke({"messages": [
        ("human", f"Research the market position, competitors, and recent news for {company}.")
    ]})
    return result["messages"][-1].content


@function(image=openai_image, timeout=600)
def financial_analyst(company: str) -> str:
    """Financial analysis using an OpenAI Agents SDK agent with tool use."""
    from agents import Agent, Runner, WebSearchTool

    agent = Agent(
        name="FinancialAnalyst",
        instructions=(
            "You are a financial analyst. Analyze revenue, margins, cash flow, "
            "and valuation metrics. Use web search to find the latest filings."
        ),
        tools=[WebSearchTool()],
    )
    result = Runner.run_sync(agent, f"Analyze the financials for {company}")
    return result.final_output


@function(image=claude_image, timeout=900, ephemeral_disk=4)
def risk_assessor(company: str) -> str:
    """Risk assessment using a Claude agent with deep reasoning."""
    import asyncio
    from claude_agent_sdk import query, ClaudeAgentOptions

    async def run():
        result = ""
        async for message in query(
            prompt=f"Assess regulatory, operational, and market risks for {company}.",
            options=ClaudeAgentOptions(
                system_prompt="You are a risk analyst. Identify and score key risks.",
                permission_mode="acceptEdits",
                cwd="/tmp/workspace",
            ),
        ):
            result = str(message)
        return result

    return asyncio.run(run())


@function(image=deep_image, timeout=900)
def technical_reviewer(company: str) -> str:
    """Technical deep-dive using a Deep Agent with planning and web search."""
    from deepagents import create_deep_agent

    agent = create_deep_agent(
        model="openai:gpt-4o",
        system_prompt="Evaluate the company's technology stack, patents, and engineering culture.",
    )
    result = agent.invoke({
        "messages": [{"role": "user", "content": f"Technical review of {company}"}]
    })
    return result["messages"][-1].content


@function(timeout=300)
def compile_analysis(market: str, financials: str, risks: str, technical: str, company: str) -> dict:
    """Combine all analyst reports into a final recommendation."""
    return {
        "company": company,
        "market_research": market,
        "financial_analysis": financials,
        "risk_assessment": risks,
        "technical_review": technical,
    }


@application()
@function()
def analyze_company(company: str) -> dict:
    # Four frameworks, four containers, all running in parallel
    market = market_researcher.awaitable(company)
    financials = financial_analyst.awaitable(company)
    risks = risk_assessor.awaitable(company)
    technical = technical_reviewer.awaitable(company)

    return compile_analysis.awaitable(market, financials, risks, technical, company)
Each agent runs in its own container with its own dependencies — no version conflicts, no shared memory, no asyncio event loop contention. If the risk assessment takes longer than the others, the completed agents’ results are checkpointed and preserved.

Different Resources Per Agent

Each sub-agent can have its own container configuration:
gpu_image = Image().run("pip install torch transformers")

@function(cpu=1, memory=2, timeout=300)
def text_agent(prompt: str) -> str:
    """Lightweight text analysis."""
    ...

@function(image=gpu_image, cpu=4, memory=16, gpu="T4", timeout=600)
def vision_agent(image_url: str) -> dict:
    """GPU-heavy image analysis."""
    ...

@function(cpu=2, memory=4, timeout=900)
def data_agent(query: str) -> list:
    """Medium resources for data fetching."""
    ...

@application()
@function()
def multimodal_analysis(prompt: str, image_url: str) -> dict:
    text_result = text_agent.awaitable(prompt)
    vision_result = vision_agent.awaitable(image_url)
    data_result = data_agent.awaitable(prompt)
    return combine_results.awaitable(text_result, vision_result, data_result)

Chaining Parallel Stages

You can chain stages where each stage fans out in parallel:
@application()
@function()
def pipeline(query: str) -> dict:
    # Stage 1: Gather data in parallel
    web = search_web.awaitable(query)
    papers = search_papers.awaitable(query)
    news = search_news.awaitable(query)

    # Stage 2: Analyze each source (runs after stage 1)
    analysis = analyze_sources.awaitable(web, papers, news)

    # Stage 3: Generate final output
    return generate_report.awaitable(analysis, query)
Each stage waits for its dependencies automatically. Stages without dependencies run in parallel.

Using Futures for More Control

When you need to do work in the orchestrator while sub-agents run, use Futures instead of tail calls:
from tensorlake.applications import application, function, Future, RETURN_WHEN

@application()
@function(timeout=1800)
def interactive_analysis(query: str) -> dict:
    # Start sub-agents
    agent_a: Future = agent_a_work.awaitable(query).run()
    agent_b: Future = agent_b_work.awaitable(query).run()

    # Do local work while agents run
    local_context = prepare_context(query)

    # Wait for both agents
    Future.wait([agent_a, agent_b], return_when=RETURN_WHEN.ALL_COMPLETED)

    return {
        "context": local_context,
        "agent_a": agent_a.result(),
        "agent_b": agent_b.result()
    }

Learn More