Tensorlake functions are the building blocks of workflows. They are Python functions decorated with the @tensorlake_function decorator.
from tensorlake import tensorlake_function

image = Image()
  .run("pip install transformers")
  .build()

@tensorlake_function(image=image, input_encoder="json")
def my_function(data: str) -> int:
    return len(data)

Functions

The @tensorlake_function decorator allows you to specify the following attributes:
  1. image - The image to use for the function container. A basic Debian based image by default. See Images.
  2. input_encoder - The serializer to use for the input of the function. json by default. See Input and output serialization.
  3. output_encoder - The serializer to use for the output of the function. json by default. See Input and output serialization.
  4. secrets - The secrets available to the function in its environment variables. No secrets by default. See Secrets.
  5. next - functions called with outputs of this function. This allows to chain functions together into a workflow graph. See Dynamic routing.
  6. name - The name of the function in the workflow. By default, it is the name of the Python function.
  7. description - A description of the function in the workflow. Visible when viewing workflow details.
  8. retries - retry policy for the function. No retries by default if function failed. See Retries.
  9. timeout - The timeout for the function in seconds. The default is 5 minutes. See Timeouts.
  10. use_ctx - If True then request context is passed to the function. False by default. See Request Context.
  11. accumulate - If not None, turns the function into a reducer. None by default. See Map-Reduce.
  12. cacheable - If True, reusing previous function outputs is allowed. False by default. See Caching.
  13. cpu - The number of CPUs available to the function. The default is 1.0 CPU. See CPU.
  14. memory - The memory GB available to the function. The default is 1.0 GB. See Memory.
  15. ephemeral_disk - The ephemeral /tmp disk space available to the function in GB. The default is 2.0 GB. See Ephemeral Disk.
  1. gpu - The GPU model and count available to the function. The default is None (no GPU). Please contact support@tensorlake.ai to enable GPU support.
The following code snippet shows an example of all the function attributes set to custom values.
from tensorlake import tensorlake_function, Image, GraphRequestContext, Retries


@tensorlake_function()
def quadruple(value: int) -> int:
    return value * 4


@tensorlake_function(
    # Use Ubuntu as a base image instead of the default Debian
    image=Image().base_image("ubuntu:latest"),
    # Use cloudpickle to deserialize function inputs
    input_encoding="cloudpickle",
    # Use cloudpickle to serialize function outputs
    output_encoding="cloudpickle",
    # Make my_secret available to the function as an environment variable
    secrets=["my_secret"],
    # Call quadruple with the output of this function
    next=quadruple,
    # Name of the function in the workflow
    name="measure string",
    # Description of the function in the workflow
    description="Measures the string using its length",
    # Retry the function twice if it fails
    retries=Retries(max_retries=2),
    # Function fails if it was running for more than 30 seconds and didn't report any progress
    timeout=30,
    # Pass the request context to the function as its first argument
    use_ctx=True,
    # Reuse previous function outputs if the function gets called with the same inputs
    cacheable=True,
    # 2 CPUs are available to the function
    cpu=2,
    # 4 GB of memory is available to the function
    memory=4,
    # 2 GB of ephemeral /tmp disk space is available to the function
    ephemeral_disk=2,
)
def string_length(ctx: GraphRequestContext, s: str) -> int:
    return len(s)

Classes

Sometimes a function needs expensive initialization, like loading a large model into memory. You can define a function as a class inherited from TensorlakeCompute and use its __init__(self) constructor to run any initialization code once on function container startup. Under the hood, all functions defined using @tensorlake_function() decorator get converted into a TensorlakeCompute instance.
from large_model import load_large_model

class MyCompute(TensorlakeCompute):
    # The same attributes as for @tensorlake_function() decorator
    image = ...
    input_encoder = ...

    def __init__(self):
        # Run initialization code once on function container startup
        self.model = load_large_model()

    def run(self, data: str) -> int:
        return self.model.run(data)

Input and Output Serialization

Inputs and Outputs to functions are serialized and deserialized as JSON by default. This is a good default since the workflows are exposed as HTTP endpoints, thus making it possible to call them from any programming language. You can also change the serialization format to cloudpickle if you want to pass complex Python objects between functions, such as Pandas dataframes, Pytorch Tensors, PIL images, etc. cloudpickle requires the objects to be serialized and deserialized on the same Python version. This requires function containers to use the same Python version. The input_encoder and output_encoder attributes can be used to change the serialization format. Currently supported formats are:
  • json - JSON serialization
  • cloudpickle - Cloudpickle serialization

Timeouts

When a function runs longer than its timeout, it is terminated and marked as failed. The timeout in seconds is set using the timeout attribute. The default timeout is 300 (5 minutes). Minimum is 1, maximum is 172800 (48 hours). Progress updates can be sent by the function to extend the timeout. See Request Context.

Retries

When a function fails by raising an exception or timing out, it gets retried according to its retry policy. The default retry policy is to not retry the function. You can specify a custom retry policy using the retries attribute.
from tensorlake import tensorlake_function, Retries

# Retry the function once if it failed
@tensorlake_function(retries=Retries(max_retries=1))
def my_function() -> int:
    raise Exception("Something went wrong")
You can set default retry policy for all the functions in a workflow. See Default retries.

Request Context

If use_ctx function attribute is True then the function gets a request context as its first parameter with name ctx. The context has information about the current request and provides access to Tensorlake APIs for the current request. By default, the request context is not passed to the function.
from tensorlake import GraphRequestContext, tensorlake_function

@tensorlake_function(use_ctx=True)
def my_function(ctx: GraphRequestContext, data: str) -> int:
    # Set request progress to 1 out of 100.
    ctx.update_progress(1, 100)
    # Print the request information.
    print(f"Request ID: {ctx.request_id}, Graph name: {ctx.graph_name}, Graph version: {ctx.graph_version}")
    # Set a request key-value pair. It's availble to the function in the same request.
    ctx.request_state.set("my_function_data", data)
    # Set request progress to 100 out of 100.
    ctx.update_progress(100, 100)
    # Return the length of the previously stored string.
    return len(ctx.request_state.get("my_function_data"))
When a function publishes a progress update for its request then the timeout of the function restarts. For example, if a function has a 4 minutes timeout and calls ctx.update_progress after 2 minutes of execution, then the timeout is reset to 4 minutes from that point, allowing the function to run for another 4 minutes.

Caching

If cacheable function attribute is True, then Tensorlake assumes that the function returns the same outputs for the same inputs. This allows Tensorlake to cache the outputs of the function and reuse them when the function is called with the same inputs again. When cached outputs are used, the function is not executed. This speeds up requests and makes them cheaper to run. The size of the cache and the caching duration is controlled by the Tensorlake Platform.

CPU

The number of CPUs available to the function is set using the cpu attribute. Minimum is 1.0, maximum is 8.0. The default is 1.0. This is usually sufficient for functions that only call external APIs and do simple data processing. Adding more CPUs is recommended for functions that do complex data processing or work with large datasets. If functions use large multy-gigabyte inputs or produce large multi-gigabyte outputs, then at least 3 CPUs are recommended. This results in the fastest download and upload speeds for the data.

Memory

GB memory available to the function is set using the memory attribute. Minimum is 1.0, maximum is 32.0. The default is 1.0. This is usually sufficient for functions that only call external APIs and do simple data processing. Adding more memory is recommended for functions that do complex data processing or work with large datasets. It’s recommended to set memory to at least 2x the size of the largest inputs and outputs of the function. This is because when the inputs/outputs are deserialized/serialized both serialized and deserialized representations are kept in memory.

Ephemeral disk

Ephemeral disk space is a temporary storage space available to functions at /tmp path. It gets erased when its function container gets terminated. It’s optimal for storing temporary files that are not needed after the function execution is completed. Ephemeral disks are backed by fast SSD drives. Using other filesystem paths like /home/ubuntu for storing temporary files will result in slower performance. Temporary files created using Python modules like tempfile are stored in ephemeral disk space inside /tmp. GB of ephemeral disk space available to the function is set using ephemeral_disk attribute. Minimum is 2.0, maximum is 50.0. The default is 2.0 GB. This is usually sufficient for functions that only call external APIs and do simple data processing. If the function needs to temporarily store large files or datasets on disk, then the ephemeral_disk attribute should be increased accordingly.

Graphs

You can string together multiple functions to form a workflow.
g = Graph(start_node=my_function, name="my_workflow", description="My workflow")
g.add_edge(my_function, my_function1)
g.add_edge(my_function, my_function2)
In the above example, my_function is the start node of the workflow. The input to the workflow is passed to my_function. Workflows are exposed as HTTP endpoints, the body of the request will be passed to the start node of the workflow, in this case my_function.
curl -X POST https://api.tensorlake.com/workflows/my_workflow \
  -H "Content-Type: application/json" \
  -d '{"data": "Hello, world!"}'

Retrieving Output

Tensorlake workflows allow retrieving the outputs of any function in the workflow.
from tensorlake import RemoteGraph

g = RemoteGraph(name="my_workflow")

g.outputs(my_function1)

Streaming Progress

You can stream the progress of your requests for interactive use-cases, to notify users about the progress of the request.
curl -N -X POST https://api.tensorlake.com/workflows/my_workflow/stream \
  -H "Content-Type: text/event-stream" \

Default retries

You can set default retry policy for all the functions in a workflow using retries attribute of the workflow. Each function can override the default retry policy by setting its own retries attribute. See Retries.
from tensorlake import tensorlake_function, Graph, Retries

# `retries` attribute is not set, uses default workflow retry policy.
@tensorlake_function()
def my_function() -> int:
    raise Exception("Something went wrong")

# Set default workflow retry policy to retry all its functions twice.
# If not set, the default workflow retry policy is to not retry the functions.
g = Graph(start_node=my_function, name="my_workflow", retries=Retries(max_retries=2))