Skip to main content
Map-Reduce is supported by Tensorlake Applications to support large scale ETL of data. Map is the process of applying a function to each item of a list in parallel. Reduce is the process of aggregating the results of the map phase. The example below visualizes mapping of a list of numbers to their squares and reducing the results by summing the squares: Tensorlake automatically parallelizes function calls across multiple function containers when you map a function to a list. The reducer function is applied to each pair of mapped values sequentially in their original order in the list. Tensorlake runs each reduce function call as soon as its input values are available.

Blocking Map-Reduce

In the following code example, we calculate the square of each number and once we have all the squares, we sum them.
from pydantic import BaseModel
from tensorlake.applications import application, function

class TotalSum(BaseModel):
    value: int = 0

@application()
@function()
def sum_squares(total_numbers: int) -> TotalSum:
    # Blocks until all map calls complete.
    # The behavior and signature of function.map is very similar to Python's built-in map except it's distributed and parallel.
    squares: List[int] = square.map([i for i in range(total_numbers)])
    # Blocks until all reduce calls complete.
    # The behavior and signature of function.reduce is very similar to Python's functools.reduce except it's distributed.
    total: TotalSum = sum_total.reduce(squares, TotalSum(value=0))
    return total

@function()
def square(number: int) -> int:
    return number ** 2

@function()
def sum_total(total: TotalSum, number: int) -> TotalSum:
    total.value += number
    # This value will be passed to the next sum_total call as the first argument.
    # Unless this is the last call, in which case it will be returned as the final
    # result of the reduce operation.
    return total

Non-blocking Map-Reduce

In the following code example, we calculate the square of each number and as soon as each square is available, we sum them. This is achieved using futures and tail calls. This reduces the overall duration of the Map-Reduce operation. The reduce function is still called sequentially in the original order of the list.
from pydantic import BaseModel
from tensorlake.applications import application, function, Future

class TotalSum(BaseModel):
    value: int = 0

@application()
@function()
def sum_squares(total_numbers: int) -> TotalSum:
    # Defines map function calls but doesn't run them.
    squares: Future = square.future.map([i for i in range(total_numbers)])
    # Defines reduce function calls that will run as soon as each mapped value is available.
    # Returns the reduce operation definition as a tail call. Tensorlake will take care of running it.
    # The final value of the reduce operation will be assigned as the request output like if this function
    # returns it here.
    return sum_total.future.reduce(squares, TotalSum(value=0))

@function()
def square(number: int) -> int:
    return number ** 2

@function()
def sum_total(total: TotalSum, number: int) -> TotalSum:
    total.value += number
    # This value will be passed to the next sum_total call as the first argument.
    # Unless this is the last call, in which case it will be returned as the final
    # result of the reduce operation.
    return total

Inputs

List

Both map and reduce operations accept a list as operation inputs. Each item in the list can be a value, a Future, a Tensorlake coroutine, or an asyncio.Task object. Tensorlake recognizes these Futures/coroutines/asyncio.Task objects, runs them automatically, and uses their results as the input values for the operation.
from tensorlake.applications import application, function, Future

@function()
def double(number: int) -> int:
    return number * 2

@function()
def sum(a: int, b: int) -> int:
    return a + b

@application()
@function()
def sum_doubled(numbers: list[int]) -> int:
    # Reduce operation input is a list of Futures.
    doubled: list[Future] = [double.future(number) for number in numbers]
    # sum is called on each pair of doubled values as soon as they are available.
    return sum.reduce(doubled, 0)

Future / Coroutine / Task

Map and reduce operations accept a single Future/coroutine/asyncio.Task object as their input. The Future/coroutine/asyncio.Task object has to resolve to a list of items. Tensorlake automatically waits for it to complete and uses the returned list as the operation input. This is useful when the input list is produced by another Tensorlake function.
from tensorlake.applications import application, function, Future

@function()
def generate_numbers(count: int) -> list[int]:
    return list(range(count))

@function()
def square(number: int) -> int:
    return number ** 2

@function()
def sum(a: int, b: int) -> int:
    return a + b

@application()
@function()
def sum_of_squares(count: int) -> int:
    # generate_numbers returns a list[int] when it completes.
    numbers_future: Future = generate_numbers.future(count)
    # Pass the Future as input to map. Tensorlake waits for generate_numbers
    # to complete and maps square over the returned list.
    squares: Future = square.future.map(numbers_future)
    # Pass the map Future as input to reduce operation.
    return sum.reduce(squares)

Tail calls

Map and reduce operation Futures can be returned from functions as tail calls. The returning function completes immediately and frees its container while Tensorlake orchestrates the map-reduce operation. This allows Tensorlake to optimize resource usage and reduce overall application latency.