Managing a data pipeline, such as transferring data from CSV to PostgreSQL, is like orchestrating a well-timed process where each step relies on the previous one. Apache Airflow streamlines this process by automating the workflow, making it easy to manage complex data tasks.
In this article, we’ll build a robust data pipeline using Apache Airflow, Docker, and PostgreSQL PostgreSQL to automate reading data from CSV files and inserting it into a database. We’ll cover key Airflow concepts such as Directed Acyclic Graphs (DAGs), tasks, and operators, which will help you efficiently manage workflows.
The aim of this project is to demonstrate how to create a reliable data pipeline with Apache Airflow that reads data from CSV files and writes it into a PostgreSQL database. We will explore the integration of various Airflow components to ensure effective data handling and maintain data integrity.
This article was published as a part of the Data Science Blogathon.
Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
Let us understand the airflow terminologies below:
In this DAG, A will run first, then split into two branches: one goes to B and then to D, and the other goes to C and then to E. Both branches can run independently after A finishes.
In simple terms: Operators define what your task will do, and XComs let tasks pass information to each other.
In Airflow, you use connections to manage and store the credentials and details required for connecting to external systems and services. They allow Airflow to interact with various data sources, APIs, and services securely and consistently. For example, when you create a Spark or AWS S3 connection in Airflow, you enable Airflow to interact with Spark clusters or AWS S3 buckets, respectively, through tasks defined in your DAGs.
Now that we are clear with the basic terminologies of airflow, lets start building our project !!
Using Docker with Apache Airflow ensures an easy and reproducible environment setup.
Writing a Dockerfile
A Dockerfile is a script that contains a series of instructions to build a Docker image.Kindly copy these instructions into a file with the name Dockerfile.
Important : Don’t save the file as Dockerfile.txt or any other extension. Simply save as Dockerfile.
FROM apache/airflow:2.9.1-python3.9
USER root
# Install Python dependencies
COPY requirements.txt /requirements.txt
RUN pip3 install --upgrade pip
RUN pip3 install --no-cache-dir -r /requirements.txt
# Install Airflow providers
RUN pip3 install apache-airflow-providers-apache-spark apache-airflow-providers-amazon n
# Install system dependencies
RUN apt-get update && \
apt-get install -y gcc python3-dev openjdk-17-jdk && \
apt-get clean
We start with a base image from the official Apache Airflow repository. This ensures that we have a stable and reliable foundation for our application:
RUN apt-get update && \
apt-get install -y gcc python3-dev openjdk-17-jdk && \
apt-get clean
Now that we are done with setting up the Dockerfile, let’s move ahead!!
In addition to creating a custom Docker image with a Dockerfile, you can easily manage and orchestrate your Docker containers using Docker Compose. The docker-compose.yml file defines the services, networks, and volumes that make up your application. We connect the Dockerfile to the Compose file so that we can build a custom image for our application and easily manage all the services it needs to run together. Let’s see how to specify our custom Dockerfile in the Docker Compose setup :
This section defines common settings for all Airflow services.
version: '1.0'
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/requirements.txt:/opt/airflow/requirements.txt
- ${AIRFLOW_PROJ_DIR:-.}/sample_files:/opt/airflow/sample_files
- ./spark_jobs:/opt/bitnami/spark_jobs
user: ${AIRFLOW_UID:-50000}:0
depends_on:
postgres:
condition: service_healthy
networks:
- confluent
After setting up the x-airflow-common, we need to define the services which will be required.
This service runs the web interface for Airflow, where users can manage and monitor workflows.
services:
airflow-webserver:
<<: *airflow-common
ports:
- "8080:8080"
depends_on:
postgres:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
The scheduler is responsible for triggering tasks based on the defined workflows.
airflow-scheduler:
<<: *airflow-common
networks :
- confluent
depends_on:
postgres:
condition: service_healthy
airflow-webserver:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
This service triggers tasks that require external events or conditions to start. It runs in a similar manner to the scheduler and connects to the same PostgreSQL database.
airflow-triggerer:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
networks:
- confluent
command: bash -c "airflow triggerer"
healthcheck:
test:
- CMD-SHELL
- airflow jobs check --job-type TriggererJob --hostname "${HOSTNAME}"
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
This service allows command-line interface (CLI) operations on the Airflow environment. It can run various Airflow commands for debugging or management.
airflow-cli:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
networks:
- confluent
profiles:
- debug
command:
- bash
- -c
- airflow
This service initializes the database and creates the default admin user.
airflow-init:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
command: >
bash -c "
airflow db init &&
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email [email protected] \
--password admin
"
networks:
- confluent
This service hosts the PostgreSQL database used by Airflow to store its metadata. We have set the username and password to connect to postgres as airflow.
postgres:
image: postgres:16.0
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
logging:
options:
max-size: 10m
max-file: "3"
healthcheck:
test:
- CMD
- pg_isready
- -U
- airflow
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- confluent
Defines a network for all services to communicate with each other.
All services are connected to the same confluent network, allowing them to interact seamlessly.
networks:
confluent:
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/dags/sql:/opt/airflow/dags/sql
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/requirements.txt:/opt/airflow/requirements.txt
- ${AIRFLOW_PROJ_DIR:-.}/sample_files:/opt/airflow/sample_files
- ./spark_jobs:/opt/bitnami/spark_jobs
user: ${AIRFLOW_UID:-50000}:0
depends_on:
postgres:
condition: service_healthy
networks:
- confluent
services:
airflow-webserver:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
networks:
- confluent
command: bash -c "airflow webserver"
ports:
- 8080:8080
healthcheck:
test:
- CMD
- curl
- --fail
- http://localhost:8080/health
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
airflow-scheduler:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
networks:
- confluent
command: bash -c "airflow scheduler"
healthcheck:
test:
- CMD
- curl
- --fail
- http://localhost:8974/health
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
airflow-triggerer:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
networks:
- confluent
command: bash -c "airflow triggerer"
healthcheck:
test:
- CMD-SHELL
- airflow jobs check --job-type TriggererJob --hostname "${HOSTNAME}"
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
airflow-init:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
command: >
bash -c "
airflow db init &&
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email [email protected] \
--password admin
"
networks:
- confluent
airflow-cli:
<<: *airflow-common
depends_on:
postgres:
condition: service_healthy
networks:
- confluent
profiles:
- debug
command:
- bash
- -c
- airflow
postgres:
image: postgres:1.0
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
logging:
options:
max-size: 10m
max-file: "3"
healthcheck:
test:
- CMD
- pg_isready
- -U
- airflow
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- confluent
networks:
confluent:
We will now look into the steps for setting up our project and execution.
First step is to create a folder and then paste the above Dockerfile and docker-compose.yml files inside this folder.
Create a requirements.txt file and write necessary python libraries. It can include pandas, numpy etc.
Start your Docker desktop. Then, open your terminal and write ” docker-compose up -d “.
You should see something like the above images. After the command is executed successfully, you should be able to see these files :
On your browser, enter this URL : http://localhost:8080. If your installation was successful, you should see:
Enter your username and password as admin. After logging in, you should see :
We use postgres_conn_id to specify the connection to the PostgreSQL database within Airflow. You define this connection ID in the Airflow UI, where you configure database credentials such as the host, port, username, and password.
By using postgres_conn_id, Airflow knows which database to connect to when executing SQL commands. It abstracts away the need to hard-code connection details directly in the DAG code, improving security and flexibility.
Here, we have given the basic connection parameters which will allow Airflow to connect to our postgres server configured on Docker.
NOTE : Write connection_id as ‘write_to_psql‘ properly as it will be used later. The login and password to connect to PostgreSQL are both set to airflow
Prepare a dummy input.csv file for the project. Store the file inside sample_files folder which was created.
First, we import the necessary components: DAG (to create the workflow), PythonOperator (to run Python functions), and PostgresOperator (to interact with a PostgreSQL database). We also define default arguments like the owner of the workflow (airflow) and the start date of the tasks, ensuring the workflow begins on January 1, 2024. Finally, we import Pandas to handle data, enabling us to read CSV files efficiently.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Define default arguments
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
This function is responsible for reading a CSV file using Pandas, then creating SQL insert queries to insert data into a PostgreSQL table. It loops through each row of the CSV, generating an SQL statement that inserts the id, name, and age values into a table. Finally, you save these queries to a file named insert_queries.sql
inside the dags/sql
folder, allowing Airflow to execute them later using a PostgresOperator.
# Function to read the CSV and generate insert queries
def generate_insert_queries():
# Read the CSV file
df = pd.read_csv(CSV_FILE_PATH)
CSV_FILE_PATH = 'sample_files/input.csv'
# Create a list of SQL insert queries
insert_queries = []
for index, row in df.iterrows():
insert_query = f"INSERT INTO sample_table (id, name, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
insert_queries.append(insert_query)
# Save queries to a file for the PostgresOperator to execute
with open('./dags/sql/insert_queries.sql', 'w') as f:
for query in insert_queries:
f.write(f"{query}\n")
This block defines the DAG (Directed Acyclic Graph), which represents the entire workflow. The parameters include:
with DAG('csv_to_postgres_dag',
default_args=default_args,
schedule_interval='@once',
catchup=False) as dag:
Each Airflow task receives a unique task_id, which serves as its identifying name within the DAG.
For example:
task_id='create_table'
The PostgresOperator allows you to run SQL commands in a PostgreSQL database using Airflow.
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='write_to_psql', # Replace with your connection ID
sql="""
DROP TABLE IF EXISTS sample_table;
CREATE TABLE sample_table (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
age INT
);
"""
)
The PythonOperator allows you to run Python functions as tasks. Here, it calls the generate_insert_queries function, which generates SQL queries from a CSV file.
generate_queries = PythonOperator(
task_id='generate_insert_queries',
python_callable=generate_insert_queries
)
run_insert_queries = PostgresOperator(
task_id='run_insert_queries',
postgres_conn_id='write_to_psql', # Define this connection in Airflow UI
sql='sql/insert_queries.sql'
)
create_table>>generate_queries>>run_insert_queries
The line create_table >> generate_queries >> run_insert_queries establishes a sequence of task execution in Apache Airflow. It means that:
In short, it defines a linear workflow where each task depends on the successful completion of the previous one.
In your VS Code, create a Python file named sample.py
inside the automatically created dags
folder.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Define default arguments
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
}
# Function to read the CSV and generate insert queries
def generate_insert_queries():
CSV_FILE_PATH = 'sample_files/input.csv'
# Read the CSV file
df = pd.read_csv(CSV_FILE_PATH)
# Create a list of SQL insert queries
insert_queries = []
for index, row in df.iterrows():
insert_query = f"INSERT INTO sample_table (id, name, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
insert_queries.append(insert_query)
# Save queries to a file for the PostgresOperator to execute
with open('./dags/sql/insert_queries.sql', 'w') as f:
for query in insert_queries:
f.write(f"{query}\n")
# Define the DAG
with DAG('csv_to_postgres_dag',
default_args=default_args,
schedule_interval='@once',
catchup=False) as dag:
# Task to create a PostgreSQL table
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='write_to_psql', # Replace with your connection ID
sql="""
DROP TABLE IF EXISTS sample_table;
CREATE TABLE sample_table (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
age INT
);
"""
)
generate_queries = PythonOperator(
task_id='generate_insert_queries',
python_callable=generate_insert_queries
)
# Task to run the generated SQL queries using PostgresOperator
run_insert_queries = PostgresOperator(
task_id='run_insert_queries',
postgres_conn_id='write_to_psql', # Define this connection in Airflow UI
sql='sql/insert_queries.sql'
)
create_table>>generate_queries>>run_insert_queries
# Other tasks can follow here
NOTE: Please put the sample.py inside the dags folder only. This is because by default, airflow looks for the files inside the dags folder.
Before running our code, we need to create a sample database inside our PostgreSQL container to write our CSV data.
On your Docker Desktop, navigate to the postgres container and go to the EXEC section. Write the following commands which will create a database called sample inside our Postgres database.
Now that you’ve built the foundation of your Airflow project, it’s time to see your hard work come to fruition! Running your code is where the magic happens; you’ll witness your CSV data being transformed and seamlessly inserted into your PostgreSQL database.
Upon clicking Graph, you can visualize your pipeline. The code section will show your latest code written in the file.
Upon clicking the play button on the upper-right corner (next to “Next Run ID: None” marker), you can run the dag. After running the dag, click on any task in the graph section to see its details. Explore to find out more.
If there were no errors, then you should see a Green color bar on the left side.
However if there are any errors, click on the task which failed and then click on the Logs to understand the error :
This project successfully demonstrated the integration of Airflow with PostgreSQL to automate the process of reading data from a CSV file and inserting it into a database. Throughout the project, various operators were explored and implemented for efficient data handling techniques. This project showcases the power of Airflow in automating data workflows and lays the groundwork for further exploration in data engineering.
Github Repo : Project File
A. Apache Airflow allows you to programmatically author, schedule, and monitor workflows as an open-source platform. It helps automate complex data pipelines by organizing tasks into directed acyclic graphs (DAGs).
A. Docker simplifies the setup and deployment of Apache Airflow by creating isolated, reproducible environments. It ensures seamless configuration and operation of all necessary dependencies and services, such as PostgreSQL, within containers.
A. Airflow can connect to PostgreSQL using its built-in database operators. You can use these operators to execute SQL queries, manage database operations, and automate data pipelines that involve reading from or writing to PostgreSQL databases.
A. You can use Python scripts in Airflow tasks to read CSV files. The task can extract data from the CSV and, through a database operator, insert the data into PostgreSQL, automating the entire workflow.
A. Yes, Apache Airflow can scale easily. With Docker, you can run multiple worker nodes, and Airflow can distribute tasks across them. Additionally, integrating a database like PostgreSQL supports efficient handling of large-scale data.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.