Haystack docs home page

Pipelines

Flexibility powered by DAGs

In order to build modern search pipelines, you need two things: powerful building blocks and an easy way to stick them together. The Pipeline class is exactly built for this purpose and enables many search scenarios beyond QA. The core idea is that you can build a Directed Acyclic Graph (DAG) where each node is one building block (Reader, Retriever, Generator ...). Here's a simple example for a standard Open-Domain QA Pipeline:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)

Initializing a Pipeline

To start building your custom pipeline, you’ll need to initialize an object of the base Pipeline class:

from haystack import Pipeline
pipeline = Pipeline()

By default, a new pipeline receives a root node called Query, which is the entry point to the pipeline graph. From that point on, you need to manually define how the information flows from one node to the next.

Adding Nodes to a Pipeline

Use the add_node() method to add new components to the pipeline graph. You may either initialize the modules before or during the call to add_node(). When you add a node to the pipeline, give it a name and a list of inputs that may contain one or more items. Note how the default Query node acts as the input node to the first explicitly defined node.

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])

Here's an example of a node with several input sources:

pipeline.add_node(component=JoinNode(), name='Joiner',
inputs=['Retriever1', 'Retriever2'])

If the predecessor node has more than one output, you’ll need to specify the output number in the inputs list. For example:

pipeline.add_node(component=Branch1(), name='Branch1',
inputs=['TopicClassifier.output_1'])
pipeline.add_node(component=Branch2(), name='Branch2',
inputs=['TopicClassifier.output_2'])

Under the hood, the nodes are placed in a queue and executed one by one when the run() method is invoked. The output of the last node in the queue is the output of the entire pipeline.

When you create a custom pipeline, you need to pay extra care that each node’s output is compatible with the input of the successive node in the chain. Otherwise, your system will throw an error at runtime.

Arguments

Whatever keyword arguments are passed into the Pipeline.run() method will be passed on to each node in the pipeline. For example, in the code snippet below, all nodes will receive query, top_k_retriever and top_k_reader as argument, even if they don't use those arguments. It is therefore very important when defining custom nodes that their keyword argument names do not clash with the other nodes in your pipeline.

res = pipeline.run(
query="What did Einstein work on?",
top_k_retriever=1,
top_k_reader=5
)

Running a Pipeline

The run() function is the single command that triggers the execution of the entire pipeline:

query = "What's the history of Quidditch?"
pipeline.run(query=query)

Every node has its own run() method, and the pipeline run() call invokes each node, one after the other. When you run() a pipeline, all the function arguments are propagated to every node in the graph. To disambiguate, say, the top_k values of retriever and ranker, they have aliases that are automatically recognized by the respective modules. This lets you dynamically modify these parameters in each call to the pipeline:

pipeline.run(query=query, top_k_retriever=28, top_k_ranker=9)

Inspecting a Pipeline

Using draw()

The pipeline.draw() method generates a sketch of your pipeline. By looking at a drawing of your pipeline, you may be able to confirm that the graph is indeed structured in the way that you intended. This is especially true for customized graphs that may branch out at some point.

image

Accessing Pipeline Nodes

If your custom pipeline is not working as intended, try running your nodes in isolation. You may access any pipeline node by using the get_node() method and specifying the component's name:

retriever_node = pipeline.get_node('Retriever')

Running a Node in Isolation

When you execute a pipeline with run(), it successively invokes the run() methods of all nodes in the queue. However, you can also use a given node's run() method in isolation.

retriever_node.run(query=query, pipeline_type='Query')

What happens during an individual run depends entirely on the given node's definition. For example, the retriever's run() method calls run_query(), which in turn calls retrieve() and a few other methods. Once you have extracted your node from the pipeline with the get_node() method, you're free to run any one of that node's class methods:

retriever_node.run_query(query=query)
retriever_node.retrieve(query=query)

If you want to find out which class methods are called by a component's run() function, we recommend that you take a look at the definitions (e.g., this one in the source code.

YAML File Definitions

For your convenience, there is also the option of defining and loading pipelines in YAML files. Having your pipeline available in a YAML is particularly useful when you move between experimentation and production environments. Just export the YAML from your notebook / IDE and import it into your production environment. It also helps with version control of pipelines, allows you to share your pipeline easily with colleagues, and simplifies the configuration of pipeline parameters in production.

For example, you can define and save a simple Retriever Reader pipeline by saving the following to a file:

version: "0.7"
components: # define all the building-blocks for Pipeline
- name: MyReader # custom-name for the component; helpful for visualization & debugging
type: FARMReader # Haystack Class name for the component
params:
no_ans_boost: -10
model_name_or_path: deepset/roberta-base-squad2
- name: MyESRetriever
type: ElasticsearchRetriever
params:
document_store: MyDocumentStore # params can reference other components defined in the YAML
custom_query: null
- name: MyDocumentStore
type: ElasticsearchDocumentStore
params:
index: haystack_test
pipelines: # multiple Pipelines can be defined using the components from above
- name: my_query_pipeline # a simple extractive-qa Pipeline
nodes:
- name: MyESRetriever
inputs: [Query]
- name: MyReader
inputs: [MyESRetriever]

To load, simply call:

pipeline.load_from_yaml(Path("sample.yaml"))

For another example YAML config, check out this file.

Custom Nodes

Thanks to the modularity of pipelines, you can create your own nodes and comfortably integrate them into your system. You should define a run() function at the core of each node class that accepts a flexible number of keyword arguments in the form of a dictionary (i.e., **kwargs). That's where the entire functionality of your node will be defined. Let's look at a node class template:

class NodeTemplate():
outgoing_edges = 1
def run(self, **kwargs):
# Insert code here to manipulate the variables in kwarg
return (kwargs, "output_1")

Usually, your node will have one outgoing edge and thus one return value. Just like the **kwargs input to a node, a node's return value should also come in the form of a Python dictionary. That value is returned within a tuple, which also contains its name, e.g., output_1.

It's also possible to have more than one outgoing edge, typically in a decision node. A decision node's run() method consists of a decision function that determines the path in the graph by which to send down its input. Such a function has more than one possible return value, and all of these will be named accordingly, i.e. output_1, output_2, and so forth.

When defining your own custom nodes, you can either create them from scratch or let your new node class inherit from haystack.BaseComponent. This second method provides a lot of Node functionality, like methods to save and load arguments:

from haystack import BaseComponent
class CustomNode(BaseComponent):
pass

To view the functionality of this new class, initialize a new node object and print its methods (the if-condition filters out dunder methods, which are less informative):

NewNode = CustomNode()
[method for method in NewNode().__dir__() if method[0] != '_']
>>> ['subclasses', 'pipeline_config', 'load_from_args', 'run', 'set_config']

Decision nodes

You can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules. To find a ready-made example of a decision node, have a look at the page about the QueryClassifier.

image

If you'd like to define our own, you'll need to create a class that looks something like this:

class QueryClassifier():
outgoing_edges = 2
def run(self, **kwargs):
if "?" in kwargs["query"]:
return (kwargs, "output_1")
else:
return (kwargs, "output_2")
pipe = Pipeline()
pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])
pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"])
pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"])
pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",
inputs=["ESRetriever", "DPRRetriever"])
pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)

Evaluation nodes

There are nodes in Haystack that are used to evaluate the performance of readers, retrievers and combine systems. To get hands on with this kind of node, have a look at the evaluation tutorial.

Ready-Made Pipelines

Last but not least, we added some ready-made pipelines that allow you to run standard patterns with very few lines of code. See the ready-made pipelines page and pipelines API documentation to learn more about these.

Examples:

from haystack.pipeline import DocumentSearchPipeline, ExtractiveQAPipeline, Pipeline, JoinDocuments
# Extractive QA
qa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)
res = qa_pipe.run(query="When was Kant born?", top_k_retriever=3, top_k_reader=5)
# Document Search
doc_pipe = DocumentSearchPipeline(retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)
# Generative QA
doc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)
# FAQ based QA
doc_pipe = FAQPipeline(retriever=retriever)
res = doc_pipe.run(query="How can I change my address?", top_k_retriever=3)

Example: Multiple retrievers

You can now also use multiple Retrievers and join their results:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])
p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])
p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)

image

Example: Creating a Retriever-Ranker-Summarizer Pipeline

In this example, we'll look at how to establish a custom Retriever-Ranker-Summarizer pipeline. It's useful to add a Ranker to a summarization pipeline because the output of the Summarizer depends on the order of the documents that it receives.

from haystack import Pipeline
pipeline = Pipeline()

To create new pipeline nodes, we initialize the modules first. For our use case, we need a retriever, a ranker, and a summarizer. We tell the summarizer to return a single summary per query (instead of one summary for each document), and that its length should be somewhere between ten and 300 words:

from haystack.retriever import ElasticsearchRetriever
from haystack.ranker import FARMRanker
from haystack.summarizer import TransformersSummarizer
retriever = ElasticsearchRetriever(document_store, top_k=10)
ranker=FARMRanker(model_name_or_path="sentence-transformers/distilbert-multilingual-nli-stsb-quora-ranking", top_k=10)
summarizer = TransformersSummarizer(model_name_or_path='t5-large', min_length=10, max_length=300, generate_single_summary=True)

We add the nodes to the pipeline:

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])
pipeline.add_node(component=ranker, name='Ranker', inputs=['Retriever'])
pipeline.add_node(component=summarizer, name='Summarizer', inputs=['Ranker'])

Let's now run our custom pipeline on the Harry Potter Wiki dataset. A typical application for this pipeline would be a situation where we want some high-level information about our corpus that is not necessarily contained within one document. We therefore retrieve multiple documents, rank them, and let the summarizer return a single summary of all the texts.

query = "What's the history of Quidditch?"
result = pipeline.run(query=query)

The pipeline returns a dictionary that contains the query, the name of the last node, and a list of documents:

result.keys()
>>> dict_keys(['documents', 'query', 'node_id'])

Since we requested a single summary of all the texts we inputted to the summarizer, the list of documents contains only one item. We access the summary through the text attribute:

result['documents'][0].text
>>> "the first record of a primitive form of Quidditch (''Kwidditch'') dates to c. 1050. the first known reference to wizards using broomsticks as a means of conveyance dates to A.D. 963. a variant of the game, Quodpot, was invented in the eighteenth century. in the middle of the 14th century it was made a protected species by the wizards council."

Example: Creating a Custom Translation Node

Let's say that we wanted to add a special translation module to our pipeline. Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (i.e., for which we have a trained model). To that end, we define a CustomTranslator class. Since there's no decision function involved, we set outgoing_edges = 1:

class CustomTranslator():
outgoing_edges = 1

Within a pipeline node, the run() function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:

def run(self, language='fr', **kwargs):
translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')

We run the translator with the specified model and return its output.

translation = translator.run(documents=kwargs['documents'])
return translation

We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the inputs for this node:

pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])

We can now call the pipeline with any Helsinki-NLP translation model from HuggingFace with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we simply specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:

query = "What's the history of Quidditch?'
result = pipeline.run(query=query, top_k_retriever=30, top_k_ranker=20, language='fr')
result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"

Now, how about Ukrainian?

result = pipeline.run(query=query, top_k_retriever=30, top_k_ranker=20, language='uk')
result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'