Skip to main content
Map-Reduce is supported by Tensorlake Applications to support large scale ETL of data. Map is the process of applying a function to each item of a list in parallel. Reduce is the process of aggregating the results of the map phase. The example below visualizes mapping of a list of numbers to their squares and reducing the results by summing the squares: Tensorlake automatically parallelizes function calls across multiple function containers when you map a function to a list. The reducer function is applied to each pair of mapped values sequentially in their original order in the list. Tensorlake can run the reduce function calls without waiting for all map calls to complete. It depends on how the application code is structured.

Blocking Map-Reduce

In the following code example, we calculate the square of each number and once we have all the squares, we sum them.
from pydantic import BaseModel
from tensorlake.applications import application, function

class TotalSum(BaseModel):
    value: int = 0

@application()
@function()
def sum_squares(total_numbers: int) -> TotalSum:
    # Blocks until all map calls complete.
    # The behavior and signature of function.map is very similar to Python's built-in map except it's parallel.
    squares: List[int] = square.map([i for i in range(total_numbers)])
    # Blocks until all reduce calls complete.
    # The behavior and signature of function.reduce is very similar to Python's functools.reduce except it's parallel.
    total: TotalSum = sum_total.reduce(squares, TotalSum(value=0))
    return total

@function()
def square(number: int) -> int:
    return number ** 2

@function()
def sum_total(total: TotalSum, number: int) -> TotalSum:
    total.value += number
    # This value will be passed to the next sum_total call as the first argument.
    # Unless this is the last call, in which case it will be returned as the final
    # result of the reduce operation.
    return total

Non-blocking Map-Reduce

In the following code example, we calculate the square of each number and as soon as each square is available, we sum them. This is achieved using awaitables and tail calls. This reduces the overall duration of the Map-Reduce operation. The reduce function is still called sequentially in the original order of the list.
from pydantic import BaseModel
from tensorlake.applications import application, function

class TotalSum(BaseModel):
    value: int = 0

@application()
@function()
def sum_squares(total_numbers: int) -> TotalSum:
    # Defines map function calls but doesn't run them.
    squares_awaitable = square.awaitable.map([i for i in range(total_numbers)])
    # Defines reduce function calls that will run as soon as each mapped value is available.
    # Returns the reduce operation definition as a tail call. Tensorlake will take care of running it.
    # The final value of the reduce operation will be assigned as the request output like if this function
    # returns it here.
    return sum_total.awaitable.reduce(squares_awaitable, TotalSum(value=0))

@function()
def square(number: int) -> int:
    return number ** 2

@function()
def sum_total(total: TotalSum, number: int) -> TotalSum:
    total.value += number
    # This value will be passed to the next sum_total call as the first argument.
    # Unless this is the last call, in which case it will be returned as the final
    # result of the reduce operation.
    return total