Apache Airflow is a powerful platform that revolutionizes the management and execution of Extracting, Transforming, and Loading (ETL) data processes. It offers a scalable and extensible solution for automating complex workflows, automating repetitive tasks, and monitoring data pipelines. This article explores the intricacies of automating ETL pipelines using Apache Airflow on AWS EC2. It demonstrates how Airflow can be used to design, deploy, and manage end-to-end data pipelines efficiently. The article uses a practical example of integrating a Weather API into an ETL pipeline, showcasing how Airflow orchestrates the retrieval, transformation, and loading of data from diverse sources.
Apache Airflow is an open-source platform that manages and monitors Directed Acyclic Graphs (DAGs) workflows. It consists of a Scheduler, Executor, Metadata Database, and Web Interface. The Scheduler manages tasks, the Executor executes them on workers, and the Metadata Database stores metadata. The Web Interface provides a user-friendly dashboard for monitoring pipeline status and managing workflows. Apache Airflow’s modular architecture allows data engineers to build, automate, and scale data pipelines with flexibility and control.
Directed Acyclic Graphs or DAGs define the sequence of tasks and their dependencies. They represent the logical flow of data through the pipeline. Each node in the DAG represents a task, while the edges denote the dependencies between tasks. DAGs are acyclic, which means they have no cycles or loops, ensuring a clear and deterministic execution path. Airflow’s DAGs enable data engineers to model complex workflows with ease, orchestrating the execution of tasks in parallel or sequentially based on their dependencies and schedule. By leveraging DAGs, users can design robust and scalable data pipelines that automate the extraction, transformation, and loading of data with precision and efficiency.
Operators are fundamental building blocks within Apache Airflow that define the individual units of work to be executed within a DAG. Each operator represents a single task in the workflow and encapsulates the logic required to perform that task. Airflow provides a wide range of built-in operators, each tailored to specific use cases such as transferring data between systems, executing SQL queries, running Python scripts, sending emails, and more. Additionally, Airflow allows users to create custom operators to accommodate unique requirements not covered by the built-in options. Operators play a crucial role in defining the functionality and behavior of tasks within a DAG, enabling users to construct complex workflows by orchestrating a series of operations seamlessly.
Launching an EC2 instance on AWS is a simple process. It is typically done through the AWS Management Console or command-line interfaces. To ensure Apache Airflow’s smooth operation, configure inbound rules to allow traffic on port 8080, the default port used by Airflow’s web server. This allows secure access to the Airflow web interface for monitoring and managing workflows. This streamlined setup balances resource allocation and functionality, laying the groundwork for efficient workflow orchestration with Apache Airflow on AWS EC2.
To install Apache Airflow on a running EC2 instance, follow these steps:
Update package lists to ensure you have the latest information on available packages:
sudo apt update
Install Python 3 pip package manager to facilitate the installation of Python packages:
sudo apt install python3-pip
Install Python 3 virtual environment package to isolate the Airflow environment from the system Python installation:
sudo apt install python3-venv
python3 -m venv airflow_venv
source airflow_venv/bin/activate
pip install pandas s3fs apache-airflow
airflow webserver --port 8080
With these commands, you’ll have Apache Airflow installed and running on your EC2 instance. You can then access the Airflow web interface by navigating to the instance’s public IP address followed by port 8080 in a web browser.
After navigating to port 8080 on your EC2 instance’s public IP address, you’ll be directed to the Apache Airflow web interface. Upon your first visit, you’ll be prompted to enter the admin password. This password is generated and displayed in your terminal after running the `airflow standalone` command. Simply copy the password from your terminal and paste it into the password field on the Airflow web interface to authenticate as the admin user. Once logged in, you’ll have access to the full functionality of Apache Airflow, where you can manage and monitor your workflows with ease.
Let’s explore a new approach by linking our EC2 instance with VS Code through the SSH extension.
If you haven’t already, download and install Visual Studio Code on your local machine from the official website: [Visual Studio Code](https://code.visualstudio.com/).
Open Visual Studio Code and install the Remote – SSH extension. This extension allows you to connect to remote machines over SSH directly from within VSCode.
Ensure that SSH is enabled on your EC2 instance. You can do this during the instance creation process or by modifying the security group settings in the AWS Management Console. Make sure you have the key pair (.pem file) that corresponds to your EC2 instance.
Log in to your AWS Management Console and navigate to the EC2 dashboard. Find your instance and note down its public IP address. You’ll need this to establish the SSH connection.
In VSCode, press `Ctrl+Shift+P` (Windows/Linux) or `Cmd+Shift+P` (Mac) to open the command palette. Type “Remote-SSH: Connect to Host” and select it. Then, choose “Add New SSH Host” and enter the following information:
After entering the required information, VSCode will attempt to connect to your EC2 instance over SSH. If prompted, choose “Continue” to trust the host. Once connected, VSCode will open a new window with access to your EC2 instance’s file system.
You can verify that you’re connected to your EC2 instance by checking the bottom-left corner of the VSCode window. It should display the name of your EC2 instance, indicating a successful connection.
Now that you’ve connected your EC2 instance to VSCode, you’re ready to start writing the code for your ETL pipeline using Apache Airflow. You can edit files directly on your EC2 instance using VSCode’s familiar interface, making development and debugging a breeze.
Now we will sign up on the Weather API website for API https://openweathermap.org/api and use it to get the weather data.Once we access our EC2 instance in VS Code, we’ll notice the Airflow folder where we previously installed the software. Inside this directory, we’ll create a new folder named “DAG” to organize our Directed Acyclic Graph (DAG) files. Here, we’ll begin writing our Python script for the DAG, laying the foundation for our workflow orchestration.
To verify the availability of the API, we’ll first navigate to the Airflow UI and access the admin section. From there, we’ll proceed to add a new connection by clicking on the “Connections” tab. Here, we’ll specify the connection ID as “weathermap_api” and set the type to “HTTP”. In the “Host” field, we’ll input “https://api.openweathermap.org”. With these settings configured, we’ll establish the connection, ensuring that our API is ready for use in our ETL pipeline.
So now we are done with the first step of checking if API is available we will extract weather data from the API and store it in a S3 Bucket. So for Storing the data in S3 bucket we will need some permissions to add with ec2-instance for that we will go to our instance click on Actions and then select “Security” from the dropdown menu. Here, you’ll find the option to modify the security settings of your instance. Click on “Modify IAM Role” to attach an IAM role.
In the IAM role management page, select “Create new IAM role” if you haven’t already created a role for EC2 instances. Choose the “EC2” service as the type of trusted entity, then click “Next: Permissions”.
In the permissions step, select “Attach policies directly”. Search for and select the policies “AmazonS3FullAccess” to grant full access to S3, and “AmazonEC2FullAccess” to provide full access to EC2 resources.
You can use this JSON also to add permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "ec2:*",
"Resource": "*"
}
]
}
Proceed to the next steps to review and name your IAM role. Once created, return to the EC2 instance’s security settings. In the IAM role dropdown, select the role you just created and click “Save”.
With the IAM role attached, your EC2 instance now has the necessary permissions to interact with S3 buckets and other AWS resources, enabling seamless data storage and retrieval as part of your ETL pipeline.
After granting the necessary permissions, you can proceed to create an S3 bucket to store the extracted weather data from the API.
Once the bucket is created, you can use it in your DAG to store the extracted weather data from the API. In your DAG script, you’ll need to specify the S3 bucket name as well as the destination path where the weather data will be stored. With the bucket configured, your DAG can seamlessly interact with the S3 bucket to store and retrieve data as needed.
To obtain the necessary access token from AWS, navigate to the AWS Management Console and click on your account name or profile icon in the top-right corner. From the dropdown menu, select “My Security Credentials”. In the “Access keys” section, either generate a new access key or retrieve an existing one. Copy both the Access Key ID and Secret Access Key provided. These credentials will authenticate your requests to AWS services. Ensure to securely store the Secret Access Key as it will not be displayed again. With these credentials, you can seamlessly integrate AWS services, such as S3, into your Apache Airflow DAGs on your EC2 instance.
And now finally this is the DAG file with all 3 steps combined.
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.http.sensors.http import HttpSensor
import json
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
import pandas as pd
# Function to convert temperature from Kelvin to Fahrenheit
def kelvin_to_fahrenheit(temp_in_kelvin):
temp_in_fahrenheit = (temp_in_kelvin - 273.15) * (9/5) + 32
return temp_in_fahrenheit
# Function to transform and load weather data to S3 bucket
def transform_load_data(task_instance):
# Extract weather data from XCom
data = task_instance.xcom_pull(task_ids="extract_weather_data")
# Extract relevant weather parameters
city = data["name"]
weather_description = data["weather"][0]['description']
temp_farenheit = kelvin_to_fahrenheit(data["main"]["temp"])
feels_like_farenheit = kelvin_to_fahrenheit(data["main"]["feels_like"])
min_temp_farenheit = kelvin_to_fahrenheit(data["main"]["temp_min"])
max_temp_farenheit = kelvin_to_fahrenheit(data["main"]["temp_max"])
pressure = data["main"]["pressure"]
humidity = data["main"]["humidity"]
wind_speed = data["wind"]["speed"]
time_of_record = datetime.utcfromtimestamp(data['dt'] + data['timezone'])
sunrise_time = datetime.utcfromtimestamp(data['sys']['sunrise'] + data['timezone'])
sunset_time = datetime.utcfromtimestamp(data['sys']['sunset'] + data['timezone'])
# Transform data into DataFrame
transformed_data = {"City": city,
"Description": weather_description,
"Temperature (F)": temp_farenheit,
"Feels Like (F)": feels_like_farenheit,
"Minimum Temp (F)": min_temp_farenheit,
"Maximum Temp (F)": max_temp_farenheit,
"Pressure": pressure,
"Humidity": humidity,
"Wind Speed": wind_speed,
"Time of Record": time_of_record,
"Sunrise (Local Time)": sunrise_time,
"Sunset (Local Time)": sunset_time
}
transformed_data_list = [transformed_data]
df_data = pd.DataFrame(transformed_data_list)
# Store data in S3 bucket
aws_credentials = {"key": "xxxxxxxxx", "secret": "xxxxxxxxxx"}
now = datetime.now()
dt_string = now.strftime("%d%m%Y%H%M%S")
dt_string = 'current_weather_data_portland_' + dt_string
df_data.to_csv(f"s3://YOUR_S3_NAME/{dt_string}.csv", index=False,
storage_options=aws_credentials)
# Define default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 8),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2)
}
# Define the DAG
with DAG('weather_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False) as dag:
# Check if weather API is ready
is_weather_api_ready = HttpSensor(
task_id='is_weather_api_ready',
http_conn_id='weathermap_api',
endpoint='/data/2.5/weather?q=Portland&APPID=**********************'
)
# Extract weather data from API
extract_weather_data = SimpleHttpOperator(
task_id='extract_weather_data',
http_conn_id='weathermap_api',
endpoint='/data/2.5/weather?q=Portland&APPID=**********************',
method='GET',
response_filter=lambda r: json.loads(r.text),
log_response=True
)
# Transform and load weather data to S3 bucket
transform_load_weather_data = PythonOperator(
task_id='transform_load_weather_data',
python_callable=transform_load_data
)
# Set task dependencies
is_weather_api_ready >> extract_weather_data >> transform_load_weather_data
The DAG file is a tool that automates the extraction, transformation, and loading of weather data from the API into an S3 bucket using Apache Airflow. It is displayed in the Airflow UI, allowing users to monitor its status, trigger manual runs, and view task logs. To test the DAG, users can trigger a manual run, expand its details, and click the “Trigger DAG” button. Task logs can be viewed to track individual tasks and diagnose issues. The Airflow UI simplifies the workflow orchestration process.
After completing the automated ETL pipeline, it’s crucial to verify the stored data in the S3 bucket. Navigate to the AWS Management Console and locate the bucket where the weather data was configured. Verify the data’s correct storage by exploring its contents, which should contain files organized according to the specified destination path. This confirms the automated ETL pipeline’s functionality and secure storage in the designated S3 bucket, ensuring the reliability and effectiveness of the automated data processing workflow.
The integration of Apache Airflow with AWS EC2 presents a robust solution for automating ETL pipelines, facilitating efficient data processing and analysis. Through our exploration of automating ETL processes with Airflow and leveraging AWS resources, we’ve highlighted the transformative potential of these technologies in driving data-driven decision-making. By orchestrating complex workflows and seamlessly integrating with external services like the Weather API, Airflow empowers organizations to streamline data management and extract valuable insights with ease.