Natural Language Processing is one of the important processes for data science teams across the globe. With ever-growing data, most of the organizations have already moved to big data platforms like Apache Hadoop and cloud offerings like AWS, Azure, and GCP. These platforms are more than capable of handling big data that enables organizations to perform analytics at scale for unstructured data like text categorization. But when it comes to machine learning, there is still a gap between big data systems and machine learning tools.
Popular machine learning python libraries like scikit-learn and Gensim are highly optimized to perform on single node machines and not designed for distributed environments. Apache Spark MLlib is one of the many tools that help to bridge this gap by offering most of machine learning models like Linear Regression, Logistic Regression, SVM, Random Forest, K-means, LDA and many more to carry out most common machine learning tasks.
If you are new to Spark, I strongly recommends you to go through below articles:
Apart from machine learning algorithms, Spark MLlib also offers a plethora of feature transformers like Tokenizer, StopWordRemover, n-grams, and features extractors like CountVectorizer, TF-IDF, and Word2Vec. Although these transformers and extractors are sufficient to build basic NLP pipeline but to build a more comprehensive and production-grade pipeline, we need more advanced techniques like stemming, lemmatization, Part-of-speech tagging, and Named Entity Recognition.
John Snow Labs Spark NLP offers a variety of annotators to perform advanced NLP tasks. For more information, check out the list of annotator and their usage on the website
https://nlp.johnsnowlabs.com/docs/en/annotators.
Let’s go ahead and see how to set up Spark NLP on AWS EMR.
1. Before we spin up the EMR cluster, we need to create a bootstrap action. Bootstrap actions are used to set up additional software or customize the configuration of cluster nodes. Following is the bootstrap action that can be used to set up Spark NLP on EMR cluster,
#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip # sudo python36 -m pip install pandas # sudo python36 -m pip install boto3 # sudo python36 -m pip install re # sudo python36 -m pip install spark-nlp==2.4.5
Once you create the shell script, copy this script to a location in AWS S3. You can also install additional python packages as per your requirement.
2. We can spin up the EMR cluster using AWS console, API, or boto3 library in python. The advantage of using Python is that you can reuse the code whenever you want to instantiate the cluster or add it to workflows.
Following is the python code to instantiate an EMR cluster.
import boto3region_name='region_name'def get_security_group_id(group_name, region_name): ec2 = boto3.client('ec2', region_name=region_name) response = ec2.describe_security_groups(GroupNames=[group_name]) return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow( Name='cluster_name', # update the value ReleaseLabel='emr-5.27.0', LogUri='s3_path_for_logs', # update the value Instances={ 'InstanceGroups': [ { 'Name': "Master nodes", 'Market': 'ON_DEMAND', 'InstanceRole': 'MASTER', 'InstanceType': 'm5.2xlarge', # change according to the requirement 'InstanceCount': 1 #for master node High Availabiltiy, set count more than 1 }, { 'Name': "Slave nodes", 'Market': 'ON_DEMAND', 'InstanceRole': 'CORE', 'InstanceType': 'm5.2xlarge', # change according to the requirement 'InstanceCount': 2 } ], 'KeepJobFlowAliveWhenNoSteps': True, 'Ec2KeyName' : 'key_pair_name', # update the value 'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name) 'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name) }, BootstrapActions=[ { 'Name':'install_dependencies', 'ScriptBootstrapAction':{ 'Args':[], 'Path':'path_to_bootstrapaction_on_s3' # update the value } }], Steps = [], VisibleToAllUsers=True, JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole', Applications=[ { 'Name': 'hadoop' }, { 'Name': 'spark' }, { 'Name': 'hive' }, { 'Name': 'zeppelin' }, { 'Name': 'presto' } ], Configurations=[ # YARN { "Classification": "yarn-site", "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4", "yarn.nodemanager.pmem-check-enabled": "false", "yarn.nodemanager.vmem-check-enabled": "false"} }, # HADOOP { "Classification": "hadoop-env", "Configurations": [ { "Classification": "export", "Configurations": [], "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"} } ], "Properties": {} }, # SPARK { "Classification": "spark-env", "Configurations": [ { "Classification": "export", "Configurations": [], "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3", "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"} } ], "Properties": {} }, { "Classification": "spark", "Properties": {"maximizeResourceAllocation": "true"}, "Configurations": [] }, { "Classification": "spark-defaults", "Properties": { "spark.dynamicAllocation.enabled": "true" #default is also true } } ] )
Note: Ensure that you have proper access to S3 bucket(s) that are used for logging and for storing bootstrap action script.
Now that we have our cluster ready, let’s build a simple text categorization example on BBC data using Spark NLP and Spark MLlib.
We will import the required libraries and initialize spark-session using different configuration parameters. The configuration values depend upon my local environment. Adjust the parameters accordingly.
# Import Spark NLP from sparknlp.base import * from sparknlp.annotator import * from sparknlp.pretrained import PretrainedPipeline import sparknlp from pyspark.sql import SparkSession from pyspark.ml import Pipeline# Start Spark Session with Spark NLP #spark = sparknlp.start()spark = SparkSession.builder \ .appName("BBC Text Categorization")\ .config("spark.driver.memory","8G")\ change accordingly .config("spark.memory.offHeap.enabled",True)\ .config("spark.memory.offHeap.size","8G") \ .config("spark.driver.maxResultSize", "2G") \ .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\ .config("spark.kryoserializer.buffer.max", "1000M")\ .config("spark.network.timeout","3600s")\ .getOrCreate()
We will be using BBC data. You can download the data from this link. After downloading the data, load the data in spark using below code;
# File location and type file_location = r'path\to\bbc-text.csv' file_type = "csv"# CSV options infer_schema = "true" first_row_is_header = "true" delimiter = ","df = spark.read.format(file_type) \ .option("inferSchema", infer_schema) \ .option("header", first_row_is_header) \ .option("sep", delimiter) \ .load(file_location)df.count()
Unlike python where we use sci-kit learn to split the data, Spark Dataframe has an inbuilt function called randomSplit() to perform the same operation.
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
The randomSplit() function requires 2 parameters viz. weights array and seed. For our example, we will be using 70–30 split where 70% will be the training data and 30% will be the test data.
Let’s go ahead and build the NLP pipeline using Spark NLP. One of the biggest advantages of Spark NLP is that it natively integrates with Spark MLLib modules that help to build a comprehensive ML pipeline consisting of transformers and estimators. This pipeline can include feature extraction modules like CountVectorizer or HashingTF and IDF. We can also include a machine learning model in this pipeline. Below is the example consisting of the NLP pipeline with feature extraction and machine learning model;
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import MulticlassClassificationEvaluator# convert text column to spark nlp document document_assembler = DocumentAssembler() \ .setInputCol("text") \ .setOutputCol("document")# convert document to array of tokens tokenizer = Tokenizer() \ .setInputCols(["document"]) \ .setOutputCol("token") # clean tokens normalizer = Normalizer() \ .setInputCols(["token"]) \ .setOutputCol("normalized")# remove stopwords stopwords_cleaner = StopWordsCleaner()\ .setInputCols("normalized")\ .setOutputCol("cleanTokens")\ .setCaseSensitive(False)# stems tokens to bring it to root form stemmer = Stemmer() \ .setInputCols(["cleanTokens"]) \ .setOutputCol("stem")# Convert custom document structure to array of tokens. finisher = Finisher() \ .setInputCols(["stem"]) \ .setOutputCols(["token_features"]) \ .setOutputAsArray(True) \ .setCleanAnnotations(False)# To generate Term Frequency hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# To generate Inverse Document Frequency idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# convert labels (string) to integers. Easy to process compared to string. label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# define a simple Multinomial logistic regression model. Try different combination of hyperparameters and see what suits your data. You can also try different algorithms and compare the scores. lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# To convert index(integer) to corresponding class labels label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# define the nlp pipeline nlp_pipeline = Pipeline( stages=[document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher, hashingTF, idf, label_stringIdx, lr, label_to_stringIdx])
Now that our NLP pipeline is ready, let’s train our model on training data.
# fit the pipeline on training data
pipeline_model = nlp_pipeline.fit(trainingData)
Once the training is done, we can predict the class labels on test (unseen) data.
# perform predictions on test data
predictions = pipeline_model.transform(testData)
Evaluating the trained model is very important to understand how the model performed on unseen data. We will look at 3 popular evaluation metric viz. Accuracy, Precision, and Recall (also called Sensitivity).
Accuracy
# import evaluator from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Accuracy = %g" % (accuracy)) print("Test Error = %g " % (1.0 - accuracy))
Precision
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))
Recall
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="weightedRecall")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))
Depending on the business use case, you can decide which metric to use for evaluating the model. E.g. If a machine learning model is designed to detect cancer based on certain parameters, it’s better to use recall or sensitivity because the company cannot afford false negatives (a person having cancer but the model did not detect it) whereas if a machine learning model is designed to generate users recommendations, the company can afford a few false negatives (8 out of 10 recommendations match the user profile) and hence can use precision as the evaluation metric.
After successfully training, testing, and evaluating the model, you can save the model to disc and use it in different Spark applications. To save the model to the disc, use below code;
pipeline_model.save('/path/to/storage_location')
Spark NLP provides a plethora of annotators and transformers to build a production-grade data pre-processing pipeline. Sparl NLP seamlessly integrates with Spark MLLib that enables us to build an end to end Natural Language Processing Project in a distributed environment. In this article, we looked at how to install Spark NLP on AWS EMR and implemented text categorization of BBC data. We also examined different evaluation metrics in Spark MLlib and saw how to store a model for further usage.
I hope you enjoyed the article. Keep learning!
Satish Silveri
Former Data Engineer and an aspiring Data Scientist. I love to work with textual data and build a state of the art Natural Language Processing solutions. I like to research new technologies and my current area of research is developing ML solutions using big data tools like Apache Spark and AWS Sagemaker