AI Sentiment Analysis Pipeline using Airflow and AWS RDS


Machine learning is the foundation of AI where a machine can learn from data we want it trained on and it can do so without explicitly being programmed. Natural language processing (NLP) is branch of machine learning where machines can learn to understand human text. Sentiment analysis is a branch of NLP that makes use of an algorithm or model to measure positive or negative tone of text. For example, "the apple was rotten and had an awful taste" would have a negative sentiment. The use cases of sentiment analysis are endless but one common use case would be a business that wants to understand customer feedback on social media.

The goal of this project is to build a data store of news headlines by using the News API REST API and then perform sentiment analyis on the headlines. For the second part we can utilize the openai REST API and take advantage of AI that is already built. Imagine we are researchers wanted to understand the world of clickbait articles and media bias. Understanding the keywords of headlines, the sentiment of headlines, and which news sources are publishing these headlines are critical data points. In our case we are going to make use of a preexisting AI model to get sentiment data, rather than training our own model to do so. We can make use of openai's text-davinci-003 model, which is a GPT-3 model, and we can utilize the openai Completion REST API to prompt the model and get results.

So we are going to build a data pipeline using the following:

You can see the project under airflow demo sentiment repo on github.

Postgres Setup

If you are following along with your own setup then this section assumes you have Terraform and Make installed. So let's first build the Postgres schema for storing our data. I wanted to build this database on AWS so I decided to use terraform to do this so that we can easily build and destroy the VPC, security group and RDS. You can see this code on the github repo here. Let's take a look at the ./infratructure directory. Assuming you are starting this from scratch, you can remove the .terraform.lock.hci file. Then you want to create a secrets.tfvars file to set a postgres password as shown in secrets.tfvars.template. Also note that the main.tf file makes use of your default aws profile. If that is not the case you would need to update this. Once all that is done you can simple run commands terraform init then make plan and then make deploy. You will see the host name be printed to the screen. Then you are good to go to connect and move on.

Once the database is established we can build the schema a variety of ways, using dbeaver for example. Below is the script to create the tables we will be using throughout the poject, which is found from the project root in ./sql/schema.sql.

Here we have the ER diagram view of our database. We see that the .env.template table has the headline level data like title, source and url. The keyword table has a 1-M relationship with the articles table, in which each headline can have multiple keywords. And finally the sentiment_value table has a 1-1 relationship with the articles table in which each headline has one sentiment value.

Airflow Setup

In this project I ran airflow locally. So I utilized docker for this. Just a brief warning, running airflow locally with all its components like celery, redis and postgres can tax your machine. Airflow has documentation on running airflow locally here.

I modified their approach a bit by using a custom Dockerfile so that I could install additional python packages that we will use. You can see the Dockerfile and the docker-compose.yml file in the project root. Using my approach create a .env file that mimicks the .env.template file in my repo so that we can fulfill airflow requirements for running airflow and we can also have our AWS creds in our environmental variables.

Then once that is done we can run docker-compose up airflow-init and then docker-compose up.

Airflow Dag

The dag for this project is very simple.

The two tasks that call openai are independent of each other and can be run in parallel.

Below we have the base version of our dag ./dags/sentiment_demo.py. The job of the rest of this blog is to fill in the code for the three functions.

File Structure

If we just consider the ./dags directory in our repo, then our directory view looks like below. The dag is in to root of the dag directory and in the ./dags/demo directory we have our models.py with SQLAlchemy models and a utils.py file with some helper and utility functions that we will use.

SQLAlchemy Models

To work with our database in a pythonic way we want to create models of our tables. We can simply use an ORM, which will be SQLAlchemy. The models in python code are written out in ./demo/models.py.

In order to establish a connection to our database we must create an airflow connection to the database we created with our terraform process. Let's call this connection sentiment_rds. With the airflow UI up and running we can navigate to connections and create the connection similar to the below. An alternate way is to do it is by using the airflow cli inside any of the containers that are up and running for the project. For example you could use bash in the aiflow-webserver container by running docker-compose exec airflow-webserver bash and then create your connection with a command like airflow connections add 'my_prod_db' \ --conn-uri 'conn-type://login:password@host:port/schema where you would need to first fill in the relevant details in the command before executing.

Then we want to be able to create a connection to our database in code so that we can easily work with our SQLAlchemy models at runtime. To do this we use BaseHook to fetch the airflow connection and we establish the database connection through SQLAlchemy's create_engine() function. We can use the db_conn() function throughout our dag to work with our models now. We have this function available in utils.py The imports here will be used for additional functions that we introduce throughout as well.

News Headlines

The News API is a good source to get articles across many media companies and news categories. We will strictly use it for political articles since this would likely give us the greatest range of sentiment values.

First Let's create a couple of airflow variables. The first will be the news sources we want to use. I have this set as a variable, as opposed to hard-coded in our dag, as we may want to add or remove media sources. As we see here we have a json list of news sources, which you can find from the newsapi documentation.

Next, we need to set the newsapi API key.

Again, you do not have to use the UI for this and instead use the airflow cli in a running container but the UI is a convenient and less error-prone way of doing it.

We saw earlier from our dag that we have a function defined called get_articles(). Let's finish the imports in the dag that we will use to finish the dag, and let's finish the code for the get_articles() function. We see here that we fetch our variable data. We use the newsapi to get articles using the get_everything() method where it queries on "politics" and the news sources listed in our airflow variable. Then we loop through our results to access the necessary data for our articles table where we use a session to persist the data in the articles table. We see that we return the range of article IDs that were generated. This xcom will be used in the next two tasks.


To work with the openai REST API we need to put our API key in an airflow variable, much like we did for the newsapi key. This variable is called openai_key. Now let's create a function in our utils.py file that will allow us to prompt the openai text-davinci-003 model. In this function we receive a given headline. We can expect to prompt openai for either keywords or sentiment. This will establish what kind of query we send to the model (more on this in the next two sections). The meat of this code here is getting the prompt text, authenticating with openai and using the Completion object to prompt openai for a result. The result is then returned.

For more on the structure of the Completion API you can read this LinkedIn blog which I found to be really good at explaining the request and response details of using the API. For example, we see we are returning data in the choices key of the response, which is the collection of completion objects. Also note that we have n=1 which means we only want one completion. And finally, max_tokens is a way of controlling cost. The article also explans this.

Headline Keywords

Now we can finish the get_keywords() function our dag. But first we must know what we are prompting the openai model with. Let's create a function in our utils.py file called get_keyword_prompt. We already make reference to this function in get_open_ai_answer(). After some prompt engineering I was able to find a good bit of text that works consistently.

This task, as well as the next, will need to get the headline for each article ID. So we write a function in utils.py to help with this.

In the code below we complete the get_keywords() function. We see that we read in the xcom from the get_articles() task which is the range of article ids we want to work with. We establish a connection to the database, loop through the id's in order to prompt openai for keywords for the given headline in an iteration. Since the response is a string version of a list of keywords, we attempt to deserialize it and then loop this list and make a record for each value in the keyword table. If for some reason we get an error deserializing it then we just insert a NULL keyword value into the keyword table for that given headline.

Headline Sentiment

Similar to the keyword task, we need to get sentiment. The code isn't as taxing as there is no inner loop. And if we cannot convert the value to an integer then we assume something has gone wrong and simply insert a NULL sentiment value into the sentiment_value table for the given article id.

Data Results

Now that we have all this data we can do some analysis. And we see that for a given article headline we can get the headline sentiment (0 to 100) and the headline keywords with the below query.

Resulting in data that looks like the below where we see this headline has a positive sentiment. We can use this data to chart trends, get insights on news sources and their sentiments around keywords and other items.

id title source sentiment keywords
2 Field of presidential candidates for 2024 is already uniquely diverse usa-today 98 {presidential,candidates,2024,diverse}

We can see that we could do a lot of analysis on media sources, sentiment around these media sources as well as drill down to certain keywords Additionally we could introduce a time element for trending analysis. Looks like a lot of analytical fun.