Skip to main content
Data workflows involve multiple steps — fetching, transforming, validating, enriching, and loading data. Tensorlake lets you define these pipelines as composed functions that automatically run in parallel where possible, with built-in durability and resource optimization. Your workflows are exposed as HTTP endpoints, that can be called on-demand. They scale up when they are called, and scale down when they are idle.

Your First Workflow

Workflows in Tensorlake use awaitables to define function calls without executing them immediately. This allows Tensorlake to optimize execution by running independent steps in parallel. When you return an awaitable from a function (called a tail call), the function completes immediately without blocking, and Tensorlake orchestrates the remaining work. Here’s a simple workflow that processes and formats data from multiple sources:
from tensorlake.applications import application, function

@application()
@function()
def enrich_record(record_id: str) -> dict:
    # Create awaitables - these don't run yet, just define the function calls
    profile = fetch_profile.awaitable(record_id)
    history = fetch_history.awaitable(record_id)

    # Return a tail call - enrich_record() completes immediately without blocking
    # Tensorlake then automatically:
    # 1. Runs fetch_profile() and fetch_history() in parallel (no dependencies between them)
    # 2. Once both complete, runs merge_data() with their results
    # 3. Uses merge_data()'s return value as enrich_record()'s final result
    return merge_data.awaitable(profile, history)


@function()
def fetch_profile(record_id: str) -> dict:
    # Fetch from profile service
    return {"id": record_id, "name": "Example Corp", "tier": "enterprise"}


@function()
def fetch_history(record_id: str) -> list:
    # Fetch transaction history
    return [{"date": "2024-01-15", "amount": 5000}]


@function()
def merge_data(profile: dict, history: list) -> dict:
    return {"profile": profile, "transactions": history}
What happens when you call this workflow:
curl https://api.tensorlake.ai/applications/enrich_record \
  -H "Authorization: Bearer $TENSORLAKE_API_KEY" \
  --json '"rec_123"'
  1. enrich_record starts and immediately returns (doesn’t block)
  2. fetch_profile("rec_123") and fetch_history("rec_123") run in parallel
  3. When both complete, merge_data runs with both results
  4. Final response contains the merged data
Key benefits:
  • Parallel execution where possible (lower latency)
  • No blocking — the orchestrator container is freed immediately
  • Automatic dependency tracking — no manual coordination needed
  • Built-in durability — failures resume from checkpoints
For a deep dive on awaitables and tail calls, see Awaitables & Tail Calls.
Each function in your workflow can be configured with retry policies. If a step fails, Tensorlake automatically retries it based on your retry configuration.

Best Practices

Design for Parallelism

Identify steps that can run independently:
# Sequential — slow
@function()
def slow_pipeline(data: str) -> str:
    result1 = step1(data)
    result2 = step2(data)  # Could have run in parallel
    return combine(result1, result2)

# Parallel — fast
@function()
def fast_pipeline(data: str) -> str:
    result1 = step1.awaitable(data)
    result2 = step2.awaitable(data)  # Runs in parallel with step1
    return combine.awaitable(result1, result2)

Use Tail Calls for Efficiency

Return awaitables instead of blocking. When you return an awaitable as a tail call, the current function’s container is freed immediately — you’re not paying for idle containers waiting for downstream results.
# Blocks container unnecessarily
@function()
def inefficient(data: str) -> str:
    result = expensive_operation(data)  # Container blocked here
    return result

# Frees container immediately
@function()
def efficient(data: str) -> str:
    return expensive_operation.awaitable(data)  # Container freed right away

Process Lists with Map-Reduce

For workflows that process collections of items, use map-reduce operations to parallelize the work:
from pydantic import BaseModel

class ProcessingResult(BaseModel):
    total_processed: int = 0
    total_value: float = 0.0

@application()
@function()
def process_batch(record_ids: list[str]) -> ProcessingResult:
    # Map: process each record in parallel
    results = process_record.awaitable.map(record_ids)
    # Reduce: aggregate results as they complete
    return aggregate_results.awaitable.reduce(results, ProcessingResult())

@function()
def process_record(record_id: str) -> dict:
    # Each record processed in its own container
    return {"id": record_id, "value": 100.0}

@function()
def aggregate_results(summary: ProcessingResult, record: dict) -> ProcessingResult:
    summary.total_processed += 1
    summary.total_value += record["value"]
    return summary
Map-reduce operations automatically run in parallel and scale to handle large datasets efficiently. See Map-Reduce for more details.

Learn More