Apache Airflow is the most popular tool for workflow management. And so, there is no doubt that Data Engineers use it extensively to build and manage their ETL pipelines. But not all the pipelines you build in Airflow will be straightforward. Some are complex and require running one out of the many tasks based on certain criteria. How would you do that? How would you choose which task to run without interrupting the flow of the pipeline? Well, that is where the BranchPythonOperator comes into the picture. This article will focus on how you can take the branching concept from programming and implement it while building pipelines in Airflow.
I encourage you to visit the following blogs before diving deep into the concept of BranchPythonOperator.
The complete code file for the below code can be found here.
Assume we have a sample DAG as follows with the respective tasks.
Here, the lead_score_generator and lead_score_validator_branch are two tasks that run sequentially. But, whether potential_lead_process task should be run next or rejected_lead_process task should run depends on the task id returned from lead_score_validator_branch task. And the task id to be returned from lead_score_validator_branch is determined by the logic we code in this particular task.
This is the branching concept we need to run in Airflow, and we have the BranchPythonOperator.
The BranchPythonOperator allows you to implement a specific task based on criteria. it executes a task created using a Python function. This task returns the task id of the next task to be run. Based on that, the next task is executed, and hence the subsequent path to be followed in the pipeline is decided.
To understand it in simple terms, think of the basic if-else logic. If certain criteria are met, a particular code is run. Otherwise, a different piece of code is run if the other condition is met. Once either of the two logics is run successfully, the program executes the subsequent code outside this loop. The same is the case with the BranchPythonOperator.
Here, let’s say the lead_score_validator_branch task returns the task id of the potential_lead_process task. This means that Airflow will run rejected_lead_process after lead_score_validator_branch task and potential_lead_process task will be skipped. Once the potential_lead_process task is executed, Airflow will execute the next task in the pipeline, which is the reporting task, and the pipeline run continues as usual.
Let’s understand the working of BranchPythonOperator with the help of an example. Let’s understand that problem statement.
Say we have a B2C company that sells products directly to customers. This company determines whether a user is a potential customer or not based on how they interact with their website. This data is passed through an ML model, which determines whether a customer leisurely surfing through the website is a potential customer for the business. This information is then passed on to the company’s sales executives, who subsequently contact the potential lead customers to pitch the business’s various products.
Through this hypothetical project, we will understand the working of the BrachPythonOperator.
Let’s start by importing the necessary libraries and defining the default DAG arguments.
Now we will define the functions for the different tasks in this DAG.
First up is the function to generate a random lead score from the ML model. We will randomly generate this score as we do not want to delve deeper into machine learning here.
We are just generating a random user id and a random accuracy here whether the user is a potential lead or not. This gets saved to a simple text file dummy_lead.txt
Now, we need to define two functions. The first one will run only when the lead score generated is greater than a certain threshold, say 0.65. This will indicate that the user is a potential lead. While the other function runs when the score is below 0.65, indicating a user who is not a potential lead.
Let’s define the function that will run when the lead score is greater than 0.65
Here, we will assign random sales executives and a random preferred time slot on which these executives can call the potential user to pitch a product. All of this will be stored in a separate file potential_lead.txt
Next up, we define the function that will run when the lead score is below 0.65
Here, we will just be storing the user id and the lead score in a separate file rejected_lead.txt
Now let’s define the DAG for this project.
We will run this DAG every 2 minutes, and we will name this sample_lead_dag
Now let’s define the tasks for the above functions.
This task will call the dummy_lead_score_generator function, which will generate a random user id and lead score. We name this task1.
Next, let’s look at task3 and task4. These tasks refer to the two functions that will run based on criteria.
The potential_lead_process task will call dummy_potential_lead_process function for a successful user lead. While rejected_lead_process task will call dummy_rejected_lead_process function, which is for a rejected user lead.
Now let’s understand the criteria based on which these tasks will be run.
This lead_score_validator_branch function will return task id potential_lead_process when the score is above 0.65. Otherwise, it will return task id rejected_lead_process. Based on this, Airflow will determine which path to follow in the DAG.
Now let’s look at the BranchPythonOperator task that will run this function.
Here, you are already aware of the task_id, python_callable, dag arguments. What is new is the trigger_rule argument. This determines when the subsequent tasks will be run. Whether they should run when all the parent tasks have succeeded? Or run when one has succeeded? Or run regardless of whether the tasks have run or not?
Here, we use the one_success trigger rule as we only want either potential_lead_process or rejected_lead_process to run successfully.
Now, we will define the last task of this DAG as a dummy task.
Finally, we define the task dependencies. Notice that we put task3 and task4 in square brackets because either of these tasks will run based on the output from task2, which is defined using the BranchPythonOperator.
Now let’s have a look at the DAG run.
Here, notice that the rejected_lead_process task was run. This was based on the task id returned by the branch operator used in lead_score_validator_branch task. And notice the other task, potential_lead_process, was skipped as we used the once_success trigger rule.
So I hope this article helped you learn about the BranchPythonOperator. It is a very powerful operator used in complex pipelines. You can easily simplify your jobs using this operator. If you have any queries, you can leave your comment below.