Migrating to Indexify is easy! Pretty much every multi-stage application code can be ported with minimal changes. Here is an example -

Original Code

This following code reads a list of documents, chunks them, embeds the chunks and writes the embeddings to a database.

def chunk(doc: str) -> list[str]:
    return doc.split("\n")

def embed_chunk(chunk: str) -> List[float]:
    return embed(chunk)


### Indexing code 
def myapp(docs: List[str]) -> None:
    chunks = [chunk(doc) for doc in docs]
    embeddings = [embed_chunk(chunk) for chunk in chunks]
    for (embeddings, chunk) in zip(embeddings, chunks):
        vectordb.write(chunk, embeddings)

This is a typical pattern in many applications. While this works in notebooks and in prototypes, it has the following limitations -

  1. The code is synchronous and runs on a single machine. It can’t scale to large datasets.
  2. If any step fails, you have to re-run the entire process. e.g Chunk and Embed if the database write fails.
  3. You have to build a solution for re-indexing data if you change the embedding model.
  4. You have to build a solution for managing access control and namespaces for different data sources.
  5. Have to wrap it with FastAPI or other server frameworks to make it callable from other applications.

Migration to Indexify

Decorate functions which are units of work

@indexify_function()
def chunk(doc: str) -> list[str]:
    return doc.split("\n")

class ChunkEmbeddings(BaseModel):
    chunk: str
    embeddings: List[float]

@indexify_function()
def embed_chunk(chunk: str) -> ChunkEmbeddings:
    embeddings = embed(chunk)
    return ChunkEmbeddings(chunk=chunk, embeddings=embeddings)

@indexify_function()
def db_writer(docs: List[str]) -> List[List[float]]:
    chunks = [chunk(doc) for doc in docs]
    embeddings = [embed_chunk(chunk) for chunk in chunks]
    return embeddings

Define and Deploy the Graph

This is the main change. We define a graph that connects these functions. The edges of the graph defines the data flow across the functions.

from indexify import RemoteGraph, Graph
g = Graph(start_node=chunk)
g.add_edge(chunk, embed_chunk)
g.add_edge(embed_chunk, db_writer)

## Creates a remote endpoint for the graph
## This is the "deployment" step
graph = RemoteGraph.deploy(g)

At this point, your graph is deployed as an Remote API. You can call it from any application code.

Call the Graph from Applications

docs = ["doc1", "doc2", "doc3"]
for doc in docs:
    invocation_id = graph.run(block_until_done=True, doc=doc)
    embeddings: List[List[float]] = graph.output(invocation_id, "embed_chunk")

Benefits

  1. You get a remote API for your workflow without writing any server code.
  2. The invocations to the graph are automatically load balanced and parallelized.
  3. If any step fails, the step will be retried without re-running the entire process.
  4. Graphs are automatically versioned, and when you roll out a new model, you can re-run the graphs on the older data.
  5. The embedding function can be run on GPUs while the database writes and chunking can run on CPUs. Every second of GPU time is being spent on the embedding function, and you are not paying for the GPU when the database write is happening.

Was this page helpful?