Using AWS Data Wrangler with AWS Glue Job 2.0

Guest Blog Last Updated : 08 Mar, 2021
6 min read

I will admit, AWS Data Wrangler has become my go-to package for developing extract, transform, and load (ETL) data pipelines and other day-to-day scripts. AWS Data Wrangler integration with multiple big data AWS services like S3, Glue Catalog, Athena, Databases, EMR, and others makes life simple for engineers. It also provides the ability to import packages like Pandas and PyArrow to help writing transformations.

In this blog post, I will walk you through a hypothetical use-case to read data from the glue catalog table and obtain filter value to retrieve data from redshift. I would create a glue connection with redshift, use AWS Data Wrangler with AWS Glue 2.0 to read data from the Glue catalog table, retrieve filtered data from the redshift database, and write result data set to S3. Along the way, I will also mention troubleshooting Glue network connection issues.

AWS Glue is a fully managed extract, transform, and load (ETL) service to process a large number of datasets from various sources for analytics and data processing.

AWS Glue Connection

You will need a glue connection to connect to the redshift database via Glue job.

AWS Glue > Data catalog > connections > Add connection

Once you have created the connection, test it. During the test, I faced the below error –

Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-630140c22a02f8cc2 in Vpc vpc-xxxxxxxxxxx.

The reason for connection failure is quite straight forward. The subnet used for the connection does not have an S3 endpoint or NAT gateway. To look into more details, navigate to the VPC page on the AWS console and select the subnet

VPC > Subnets > Filter > VPC: vpc-xxxxxxxxxxx > subnet

Review the route table the subnet is associated with. In this case, the subnet was associated with a route table that has a route to an Internet gateway making it a public subnet. To resolve this issue either associate the subnet to route table that has route to the NAT gateway or edit the glue connection to use the private subnet (using the NAT gateway).

The difference between public and private subnet is that the instances in the public subnet can send outbound traffic directly to the Internet whereas private subnet can access the Internet by using a network address translation (NAT) gateway that resides in the public subnet.

After making the changes, test the connection. This time the test connection will take some time but will succeed.

Add AWS Glue Job

Let’s move ahead with creating a new Glue job. For this blog, I choose “A new script to be authored by you” and it gives an option Connections.

AWS Console > AWS Glue > ETL > Jobs > Add job

You can choose to enable “Continuous logging” in the Monitoring options sections. Continuing ahead, down on the same page there is an option to add job parameters. AWS Data Wrangler development team has made the package integration simple. When adding a new job with Glue Version 2.0 all you need to do is specify “--additional-python-modules” as key in Job Parameters and ” awswrangler ” as value to use data wrangler.

AWS Console > AWS Glue > ETL > Jobs > Add job > Security configuration, script libraries, and job parameters (optional)

On the next page, choose the connection to be used by the job which in my case is “MyRedshift”.

Add the authored script –

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import awswrangler as wr
import pandas as pd

@params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

db_username = "admin"
db_password = "xxxxxxxxx"

def GetTableData():
    ## get total row count using aws data wrangler
    getTotalRowCount = "SELECT count(*) as totalRowCount from orders"
    df_count = wr.athena.read_sql_query(sql=getTotalRowCount, database="myredshift")
    totalRowCount = df_count.iloc[0,0]
    print("Total row count from Glue Table: ", totalRowCount)
    ## get max Order Date using aws data wrangler
    maxOrderDateSql = "SELECT max(o_orderdate) as maxOrderDate FROM orders"
    df = wr.athena.read_sql_query(sql=maxOrderDateSql, database="myredshift")
    maxOrderDate = df.iloc[0,0]
    print("MaxOrderdate from Glue Table: ", maxOrderDate)
    return maxOrderDate
 
print("Get the max order date from glue table myredshift.orders to create redsfhit query")
maxOrderDate = GetTableData()
query = "SELECT * from admin.orders where o_orderdate > '{}'".format(maxOrderDate)
print("Query to be executed in redshift: ", query)

## define the redshift connection options
connection_options = {  
    "url": "jdbc:redshift://dwtest.paidfoobarrr.us-east-1.redshift.amazonaws.com:5439/dwtest",
    "query": query,
    "user": db_username,
    "password": db_password,
    "redshiftTmpDir": args["TempDir"],
    "aws_iam_role": "arn:aws:iam::xxxxxxxxxxxx:role/dwtest-s3-access"
 }

## create glue dynamicframe
df=glueContext.create_dynamic_frame_from_options(connection_type="redshift", connection_options=connection_options)
#get row count
print("Record count from redshift: ", df.toDF().count())

## write the data to S3 location using glue catalog
sink = glueContext.write_dynamic_frame_from_catalog(
                 frame=df,
                 database="myredshift",
                 table_name="orders")
print("Completed writing data to Glue table myredshift.orders")
print("Get the total row count and current max order from Glue table")
GetTableData()

job.commit()

Log –

Get the max order date from glue table myredshift.orders to create redsfhit query 
Total row count from Glue Table: 69240242 
MaxOrderdate from Glue Table: 1997-12-31 
Query to be executed in redshift: SELECT * from admin.orders where o_orderdate > '1997-12-31' 
Record count from redshift: 6759758 
Completed writing data to Glue table myredshift.orders 
Get the total row count and current max order from Glue table 
Total row count from Glue Table: 76000000 
MaxOrderdate from Glue Table: 1998-08-02

Walkthrough of the authored script –

  • import awsglue libraries
  • import awswrangler and pandas
  • create glue context and spark session
  • get the max(o_orderdate) data from glue catalog table using wr.athena.read_sql_query function
  • Use the max order date to query the redshift database to get all records post that using create_dynamic_frame_from_options
  • write the data on S3 using write_dynamic_frame_from_catalog

In the background, Glue executes the UNLOAD command to retrieve the data from redshift.

UNLOAD ( 'SELECT "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment" FROM (SELECT * from admin.orders where o_orderdate > \'1997-12-31\') ' ) TO 's3://aws-glue-temporary-xxxxxxxxxxxx-us-east-1/admin/2c726fe4-6d85-4b81-92ee-69d9543640ae/' WITH CREDENTIALS '' ESCAPE MANIFEST

Even though the Glue connection succeeded you might still encounter job failure (happened to me at least) –

  1. Glue job run fails with “Command failed with exit code 1”
  2. ModuleNotFoundError: No module named ‘awswrangler’

There could be few possible reasons for job failure but in my case, it boiled down to the Subnet used for the connection. So, make sure you are using a private subnet, with NAT gateway attached to route table associated with the subnet. Also, confirm you have a NAT gateway that resides in the public subnet.

To conclude, in this blog post we learned

  • how to set up glue connection to redshift database
  • use of subnets and its importance
  • difference between public and private subnet
  • creating a glue job with AWS data wrangle package
  • using AWS data wrangler to query Glue catalog table
  • using the result of the above data in the filter to query the redshift database
  • unload the redshift data to S3 using glue dynamicframe
  • troubleshooting glue connections, module not found an error

 

About the Author

Author

An avid learner of technology solutions around databases, big data, machine learning.

5x AWS Certified | 5x Oracle Certified
Connect on Twitter @anandp86

The media shown in this article are not owned by Analytics Vidhya and is used at the Author’s discretion.

Responses From Readers

Clear

Mohammad
Mohammad

I followed the same steps but job is running continuosly it's stopping and also not generating the logs

We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.

Show details