I will admit, AWS Data Wrangler has become my go-to package for developing extract, transform, and load (ETL) data pipelines and other day-to-day scripts. AWS Data Wrangler integration with multiple big data AWS services like S3, Glue Catalog, Athena, Databases, EMR, and others makes life simple for engineers. It also provides the ability to import packages like Pandas and PyArrow to help writing transformations.
In this blog post, I will walk you through a hypothetical use-case to read data from the glue catalog table and obtain filter value to retrieve data from redshift. I would create a glue connection with redshift, use AWS Data Wrangler with AWS Glue 2.0 to read data from the Glue catalog table, retrieve filtered data from the redshift database, and write result data set to S3. Along the way, I will also mention troubleshooting Glue network connection issues.
AWS Glue is a fully managed extract, transform, and load (ETL) service to process a large number of datasets from various sources for analytics and data processing.
You will need a glue connection to connect to the redshift database via Glue job.
AWS Glue > Data catalog > connections > Add connection
Once you have created the connection, test it. During the test, I faced the below error –
Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-630140c22a02f8cc2 in Vpc vpc-xxxxxxxxxxx.
The reason for connection failure is quite straight forward. The subnet used for the connection does not have an S3 endpoint or NAT gateway. To look into more details, navigate to the VPC page on the AWS console and select the subnet
VPC > Subnets > Filter > VPC: vpc-xxxxxxxxxxx > subnet
Review the route table the subnet is associated with. In this case, the subnet was associated with a route table that has a route to an Internet gateway making it a public subnet. To resolve this issue either associate the subnet to route table that has route to the NAT gateway or edit the glue connection to use the private subnet (using the NAT gateway).
The difference between public and private subnet is that the instances in the public subnet can send outbound traffic directly to the Internet whereas private subnet can access the Internet by using a network address translation (NAT) gateway that resides in the public subnet.
After making the changes, test the connection. This time the test connection will take some time but will succeed.
Let’s move ahead with creating a new Glue job. For this blog, I choose “A new script to be authored by you” and it gives an option Connections.
AWS Console > AWS Glue > ETL > Jobs > Add job
You can choose to enable “Continuous logging” in the Monitoring options sections. Continuing ahead, down on the same page there is an option to add job parameters. AWS Data Wrangler development team has made the package integration simple. When adding a new job with Glue Version 2.0 all you need to do is specify “--additional-python-modules
” as key in Job Parameters and ” awswrangler
” as value to use data wrangler.
AWS Console > AWS Glue > ETL > Jobs > Add job > Security configuration, script libraries, and job parameters (optional)
On the next page, choose the connection to be used by the job which in my case is “MyRedshift”.
Add the authored script –
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import awswrangler as wr
import pandas as pd
@params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
db_username = "admin"
db_password = "xxxxxxxxx"
def GetTableData():
## get total row count using aws data wrangler
getTotalRowCount = "SELECT count(*) as totalRowCount from orders"
df_count = wr.athena.read_sql_query(sql=getTotalRowCount, database="myredshift")
totalRowCount = df_count.iloc[0,0]
print("Total row count from Glue Table: ", totalRowCount)
## get max Order Date using aws data wrangler
maxOrderDateSql = "SELECT max(o_orderdate) as maxOrderDate FROM orders"
df = wr.athena.read_sql_query(sql=maxOrderDateSql, database="myredshift")
maxOrderDate = df.iloc[0,0]
print("MaxOrderdate from Glue Table: ", maxOrderDate)
return maxOrderDate
print("Get the max order date from glue table myredshift.orders to create redsfhit query")
maxOrderDate = GetTableData()
query = "SELECT * from admin.orders where o_orderdate > '{}'".format(maxOrderDate)
print("Query to be executed in redshift: ", query)
## define the redshift connection options
connection_options = {
"url": "jdbc:redshift://dwtest.paidfoobarrr.us-east-1.redshift.amazonaws.com:5439/dwtest",
"query": query,
"user": db_username,
"password": db_password,
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::xxxxxxxxxxxx:role/dwtest-s3-access"
}
## create glue dynamicframe
df=glueContext.create_dynamic_frame_from_options(connection_type="redshift", connection_options=connection_options)
#get row count
print("Record count from redshift: ", df.toDF().count())
## write the data to S3 location using glue catalog
sink = glueContext.write_dynamic_frame_from_catalog(
frame=df,
database="myredshift",
table_name="orders")
print("Completed writing data to Glue table myredshift.orders")
print("Get the total row count and current max order from Glue table")
GetTableData()
job.commit()
Log –
Get the max order date from glue table myredshift.orders to create redsfhit query
Total row count from Glue Table: 69240242
MaxOrderdate from Glue Table: 1997-12-31
Query to be executed in redshift: SELECT * from admin.orders where o_orderdate > '1997-12-31'
Record count from redshift: 6759758
Completed writing data to Glue table myredshift.orders
Get the total row count and current max order from Glue table
Total row count from Glue Table: 76000000
MaxOrderdate from Glue Table: 1998-08-02
Walkthrough of the authored script –
In the background, Glue executes the UNLOAD command to retrieve the data from redshift.
UNLOAD ( 'SELECT "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment" FROM (SELECT * from admin.orders where o_orderdate > \'1997-12-31\') ' ) TO 's3://aws-glue-temporary-xxxxxxxxxxxx-us-east-1/admin/2c726fe4-6d85-4b81-92ee-69d9543640ae/' WITH CREDENTIALS '' ESCAPE MANIFEST
Even though the Glue connection succeeded you might still encounter job failure (happened to me at least) –
There could be few possible reasons for job failure but in my case, it boiled down to the Subnet used for the connection. So, make sure you are using a private subnet, with NAT gateway attached to route table associated with the subnet. Also, confirm you have a NAT gateway that resides in the public subnet.
To conclude, in this blog post we learned
5x AWS Certified | 5x Oracle Certified
Connect on Twitter @anandp86
The media shown in this article are not owned by Analytics Vidhya and is used at the Author’s discretion.
I followed the same steps but job is running continuosly it's stopping and also not generating the logs