Notebooks
d
deepset
Hybrid Rag Pipeline With Breakpoints

Hybrid Rag Pipeline With Breakpoints

agentic-aiagenticagentsgenaiAIhaystack-cookbookgenai-usecaseshaystack-ainotebooksPythonragai-tools

Hybrid RAG Pipeline with Breakpoints

This notebook demonstrates how to setup breakpoints in a Haystack pipeline. In this case, we will set up break points in a hybrid retrieval-augmented generation (RAG) pipeline. The pipeline combines BM25 and embedding-based retrieval methods, then uses a transformer-based reranker and an LLM to generate answers.

Install packages

[ ]

Setup OpenAI API keys

[1]

Import Required Libraries

First, let's import all the necessary components from Haystack.

[ ]

Document Store Initializations

Let's create a simple document store with some sample documents and their embeddings.

[3]

A Hybrid Retrieval Pipeline

Now let's build a hybrid RAG pipeline.

[4]

Running the pipeline with breakpoints

Now we demonstrate how to set breakpoints in a Haystack pipeline to inspect and debug the pipeline execution at specific points. Breakpoints allow you to pause execution, save the current state of pipeline, and later resume from where you left off.

We'll run the pipeline with a breakpoint set at the query_embedder component. This will save the pipeline state before executing the query_embedder and raise PipelineBreakpointException to stop execution.

[5]
[6]
Indexing documents...
TransformersSimilarityRanker is considered legacy and will no longer receive updates. It may be deprecated in a future release, with removal following after a deprecation period. Consider using SentenceTransformersSimilarityRanker instead, which provides the same functionality along with additional features.
---------------------------------------------------------------------------
BreakpointException                       Traceback (most recent call last)
Cell In[6], line 15
      6 question = "Where does Mark live?"
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)
     12     "answer_builder": {"query": question},
     13 }
---> 15 pipeline.run(data, break_point=break_point)

File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:378, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    376         # trigger the breakpoint if needed
    377         if should_trigger_breakpoint:
--> 378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
    382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
   (...)
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.

File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:299, in _trigger_break_point(pipeline_snapshot, pipeline_outputs)
    297 component_visits = pipeline_snapshot.pipeline_state.component_visits
    298 msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
--> 299 raise BreakpointException(
    300     message=msg, component=component_name, inputs=pipeline_snapshot.pipeline_state.inputs, results=pipeline_outputs
    301 )

BreakpointException: Breaking at component query_embedder at visit count 0

This run should be interruped with a BreakpointException: Breaking at component query_embedder visit count 0 - and this will generate a JSON file in the "snapshots" directory containing a snapshot of the before running the component query_embedder.

The snapshot files, named after the component associated with the breakpoint, can be inspected and edited, and later injected into a pipeline and resume the execution from the point where the breakpoint was triggered.

[ ]

Resuming from a break point

We can then resume a pipeline from its saved pipeline_snapshot by passing it to the Pipeline.run() method. This will run the pipeline to the end.

[10]
Mark lives in Berlin.
{'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 5, 'prompt_tokens': 124, 'total_tokens': 129, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'all_messages': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>, _content=[TextContent(text='Mark lives in Berlin.')], _name=None, _meta={'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 5, 'prompt_tokens': 124, 'total_tokens': 129, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}})]}

Advanced Use Cases for Pipeline Breakpoints

Here are some advanced scenarios where pipeline breakpoints can be particularly valuable:

  1. Set a breakpoint at the LLM to try results of different prompts and iterate in real time.

  2. Place a breakpoint after the document retriever to examine and modify retrieved documents.

  3. Set a breakpoint before a component to inject gold-standard inputs and isolate whether issues stem from input quality or downstream logic.

To demonstrate the use case stated in point 1, we reuse the same query pipeline with a new question. First, we run the pipeline with the prompt that we originally passed to the prompt_builder. Then, we define a breakpoint at the prompt_builder to try an alternative prompt. This allows us to compare the results generated by different prompts without running the whole pipeline again.

[11]
TransformersSimilarityRanker is considered legacy and will no longer receive updates. It may be deprecated in a future release, with removal following after a deprecation period. Consider using SentenceTransformersSimilarityRanker instead, which provides the same functionality along with additional features.
Indexing documents...
---------------------------------------------------------------------------
BreakpointException                       Traceback (most recent call last)
Cell In[11], line 18
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)
     12     "answer_builder": {"query": question},
     13 }
     16 break_point = Breakpoint(component_name="prompt_builder", visit_count=0, snapshot_file_path="snapshots/")
---> 18 pipeline.run(data, break_point=break_point)

File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:378, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    376         # trigger the breakpoint if needed
    377         if should_trigger_breakpoint:
--> 378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
    382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
   (...)
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.

File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:299, in _trigger_break_point(pipeline_snapshot, pipeline_outputs)
    297 component_visits = pipeline_snapshot.pipeline_state.component_visits
    298 msg = f"Breaking at component {component_name} at visit count {component_visits[component_name]}"
--> 299 raise BreakpointException(
    300     message=msg, component=component_name, inputs=pipeline_snapshot.pipeline_state.inputs, results=pipeline_outputs
    301 )

BreakpointException: Breaking at component prompt_builder at visit count 0

Now we can manually insert a different template into the prompt_builder and inspect the results. To do this, we update the template input within the prompt_builder component in the state file.

[12]

Now we just load the snapshot file and resume the pipeline with the updated snapshot.

[27]
snapshots/prompt_builder_2025_07_26_13_01_23.json
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
[16]
The temperature in Rome is 30°C and in Berlin is 15°C. The temperature difference between the warmest (Rome) and the coldest (Berlin) city is 30°C - 15°C = 15°C.