Graphs

Workflows are created by connecting multiple functions together in a Graph.

Graph contains:

  • Node: Represents a function that operates on data.
  • Start Node: which is the first function that is executed when the graph is invoked.
  • Edges: Represents data flow between functions.
  • Conditional Edge: Evaluates input data from the previous function and decide which edges to take. They are like if-else statements in programming.

Graphs are workflows that has functions that can be executed in parallel, while Pipelines are linear workflows that execute functions serially.

Functions

They are regular Python functions, decorated with @tensorlake_function() decorator.

Function can be executed in a distributed manner, and the output is stored so that if downstream functions fail, they can be resumed from the output of the function.

There are various other parameters, in the decorator that can be used to configure retry behavior, placement constraints, and more.

Programming Model

Pipeline

Transforming the input of the graph so that every node transforms the output of the previous node until reaching the end node.

@tensorlake_function()
def node1(input: int) -> int:
    return input + 1

@tensorlake_function()
def node2(input2: int) -> int:
    return input2 + 2

@tensorlake_function()
def node3(input3: int) -> int:
    return input3 + 3

graph = Graph(name="pipeline", start_node=node1)
graph.add_edge(node1, node2)
graph.add_edge(node2, node3)

Use Cases: Transforming a video into text by first extracting the audio, and then doing Automatic Speech Recognition (ASR) on the extracted audio.

Parallel Branching

Generating more than one graph output for the same graph input in parallel.

@tensorlake_function()
def start_node(input: int) -> int:
    return input + 1

@tensorlake_function()
def add_two(input: int) -> int:
    return input + 2

@tensorlake_function()
def is_even(input: int) -> int:
    return input % 2 == 0

graph = Graph(name="pipeline", start_node=start_node)
graph.add_edge(start_node, add_two)
graph.add_edge(start_node, is_even)

Use Cases: Extracting embeddings and structured data from the same unstructured data.

Map

Automatically parallelize functions across multiple machines when a function returns a sequence and the downstream function accepts only a single element of that sequence.

@tensorlake_function()
def fetch_urls() -> list[str]:
    return [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
    ]

# scrape_page is called in parallel for every element of fetch_url across
# many machines in a cluster or across many worker processes in a machine
@tensorlake_function()
def scrape_page(url: str) -> str:
    content = requests.get(url).text
    return content

Use Cases: Generating Embedding from every single chunk of a document.

Map Reduce - Reducing/Accumulating from Sequences

Reduce functions in Tensorlake Serverless aggregate outputs from one or more functions that return sequences. They operate with the following characteristics:

  • Lazy Evaluation: Reduce functions are invoked incrementally as elements become available for aggregation. This allows for efficient processing of large datasets or streams of data.
  • Stateful Aggregation: The aggregated value is persisted between invocations. Each time the Reduce function is called, it receives the current aggregated state along with the new element to be processed.
@tensorlake_function()
def fetch_numbers() -> list[int]:
    return [1, 2, 3, 4, 5]

class Total(BaseModel):
    value: int = 0

@tensorlake_function(accumulate=Total)
def accumulate_total(total: Total, number: int) -> Total:
    total.value += number
    return total

Use Cases: Aggregating a summary from hundreds of web pages.

Dynamic Routing

Functions can route data to different nodes based on custom logic, enabling dynamic branching.

@tensorlake_function()
def handle_error(text: str):
    # Logic to handle error messages
    pass

@tensorlake_function()
def handle_normal(text: str):
    # Logic to process normal text
    pass

# The function routes data into the handle_error and handle_normal based on the
# logic of the function.
@tensorlake_router()
def analyze_text(text: str) -> List[Union[handle_error, handle_normal]]:
    if 'error' in text.lower():
        return [handle_error]
    else:
        return [handle_normal]

Use Cases: Processing outputs differently based on classification results.