Async Pipeline
Running Haystack Pipelines in Asynchronous Environments
Notebook by Madeeswaran Kannan & Mathis Lucka
In this notebook, you'll learn how to use the AsyncPipeline and async-enabled components to build and execute a Haystack pipeline in an asynchronous environment. It's based on this short Haystack tutorial, so it would be a good idea to familiarize yourself with it before we begin. A further prerequisite is working knowledge of cooperative scheduling and async programming in Python.
Motivation
By default, the Pipeline class in haystack is a regular Python object class that exposes non-async methods to add/connect components and execute the pipeline logic. Currently, it can be used in async environments, but it's not optimal to do so since it executes its logic in a 'blocking' fashion, i.e., once the Pipeline.run method is invoked, it must run to completion and return the outputs before the next statement of code can be executed1. In a typical async environment, this prevents active async event loop from scheduling other async coroutines, thereby reducing throughput. To mitigate this bottleneck, we introduce the concept of async-enabled Haystack components and an AsyncPipeline class that cooperatively schedules the execution of both async and non-async components.
1 - This is a simplification as the Python runtime can potentially schedule another thread, but it's a detail that we can ignore in this case.
AsyncPipeline Benefits
- Execute components concurrently to speed-up pipeline execution.
- Execute components step by step to debug your pipelines.
- Increase throughput in async environments, e.g. when executing pipelines behind a FastAPI-endpoint.
- Allow individual components to opt into
asyncsupport.- Not all components benefit from being async-enabled - I/O-bound components are the most suitable candidates.
- Provide a backward-compatible way to execute Haystack pipelines containing both async and non-async components.
Let's now go ahead and see what it takes to add async support to the original tutorial, starting with installing Haystack, and the requisite dependencies.
Development Environment
Provide an OpenAI API key to ensure that LLM generator can query the OpenAI API.
Creating an AsyncPipeline
Fetching and Indexing Documents
Initialize a DocumentStore to index your documents.
Fetch the data and convert it into Haystack Documents.
To store your data in the DocumentStore with embeddings, initialize a SentenceTransformersDocumentEmbedder.
Then, run the embedder to calculate the embeddings of the docs and write the documents to the document store.
Batches: 100%|██████████| 5/5 [00:00<00:00, 5.37it/s]
Indexed 151 documents
The next step is to build the RAG pipeline to generate answers for a user query. We build a RAG pipeline using hybrid retrieval. Hybrid retrieval uses two retrieval branches that can run concurrently.
Initialize a text embedder to create an embedding for the user query and an InMemoryEmbeddingRetriever as well as a InMemoryBM25Retriever to use with the InMemoryDocumentStore you initialized earlier. We feed the results of both retrievers into a DocumentJoiner and we use reciprocal rank fusion to get to our final ranking of the documents.
Create a custom prompt to use with the ChatPromptBuilder and initialize a OpenAIChatGenerator to consume the output of the former.
We finally get to the creation of the pipeline instance. Instead of using the Pipeline class, we use the AsyncPipeline class.
The rest of the process, i.e., adding components and connecting them with each other remains the same as with the original Pipeline class.
Now, we create a coroutine that queries the pipeline with a question.
We use the run_async_generator method to execute the AsyncPipeline. run_async_generator returns an AsyncIterator that we need to iterate over to make progress in the pipeline's execution.
Essentially, this allows us to step through the pipeline execution component by component, which is useful for debugging a pipeline or when you want to run custom logic upon any component's completion.
The AsyncPipeline also exposes:
- a
run_asyncmethod that executes the full pipeline before returning the final outputs - a
runmethod that can be called from non-async environments but still executes components concurrently; therunmethod is a drop-in replacement forPipeline.run
We iterate over the AsyncIterator and print intermediate outputs from the retrievers and the joiner.
Outputs from `bm25_retriever`. Score: 13.520340633994273 Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2] To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev... ------------ Outputs from `embedding_retriever`. Score: 0.6933103186685945 Content: The construction of the Hanging Gardens has also been attributed to the legendary queen Semiramis[4] and they have been called the Hanging Gardens of Semiramis as an alternative name.[5] The Hanging Gardens are the only one of the Seven Wonders for which the location has not been definitively established.[6] There are no extant Babylonian texts that mention the gardens, and no definitive archaeological evidence has been found in Babylon.[7][8] Three theories have been suggested to account for th... ------------ Outputs from `joiner`. Score: 0.9919354838709679 Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2] To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev... ------------ LLM Response: The Hanging Gardens of Babylon are said to have been located in the ancient city of Babylon, near present-day Hillah in the Babil province of Iraq. However, the exact location has not been definitively established, and there is no archaeological evidence confirming their existence in Babylon. Some theories suggest that they could have been confused with the gardens built by the Assyrian king Sennacherib in his capital city of Nineveh, near modern-day Mosul.
Sequential vs Concurrent Execution
Now, let's compare sequential execution and concurrent execution of multiple queries. We create two utility functions that run a list of questions. Both use the AsyncPipeline but only one of them runs each question as a co-routine.
We run the pipeline with 3 examples.
Let's run the questions sequentially first.
All tasks completed in 8.48 seconds
Let's check how long it takes if we run questions concurrently.
Batches: 0%| | 0/1 [00:00<?, ?it/s] Batches: 100%|██████████| 1/1 [00:00<00:00, 2.11it/s]
All tasks completed in 3.57 seconds
You can see that concurrent execution of the pipeline is more than twice as fast as the sequential execution.
Concurrent Component Execution
The example above is running the retriever components concurrently. Any components that could run concurrently, typically because they sit on parallel branches of the pipeline, are automatically scheduled to run concurrently by the AsyncPipeline's run-logic.
Let's create a small example with a custom component to illustrate concurrent execution in more detail.
You can see that this pipeline has 3 parallel branches. Let's run this pipeline to see how it executes components concurrently.
{'wait_1': {'message': 'Message from wait_1'}}
{'wait_3': {'message': 'Message from wait_3'}}
{'wait_2': {'message': 'Message from wait_2'}}
{'wait_5': {'message': 'Message from wait_5'}}
{'wait_4': {'message': 'Message from wait_4'}}
Custom Asynchronous Components
Individual components can opt into async by implementing a run_async coroutine that has the same signature, i.e., input parameters and outputs as the run method. This constraint is placed to ensure that pipeline connections are the same irrespective of whether a component supports async execution, allowing for plug-n-play backward compatibility with existing pipelines.