Preventing Customer Churn, Part 1. Connect to Amazon Aurora MySQL database, data loading and extraction


Table of contents

Section 1. Setup

  1. Prepare the Amazon Aurora MySQL Database
  2. Download the Customer Churn Data
  3. Create Database, Table, Load Data in Amazon Aurora MySQL
  4. Load Customer Messages to Database

Section 2. Export data from Amazon Aurora to S3

  1. Export data from Amazon Aurora to S3

Begin by upgrading pip. To connect to the database we will use mysql.connector module. MySQL Connector/Python enables Python programs to access MySQL databases.

In [1]:
import sys

# upgrade pip
!{sys.executable} -m pip install --upgrade pip 
# install mysql.connector
!{sys.executable} -m pip install mysql.connector
Requirement already up-to-date: pip in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (20.0.2)
Processing /home/ec2-user/.cache/pip/wheels/23/27/3e/72be437e6a950b8972728d2a62935ae7cd0d7c3c251fb2e737/mysql_connector-2.2.9-cp36-cp36m-linux_x86_64.whl
Installing collected packages: mysql.connector
Successfully installed mysql.connector

For this use case, we've created the S3 bucket and appropriate IAM roles for you during the launch of the AWS CloudFormation template. The bucket name was saved in a parameter file called "cloudformation_values.py" during creation of the notebook instance, along with the DB secret name and ML endpoint name.

In [2]:
# import installed module
import mysql.connector as mysql
import json
import os
import pandas as pd
import numpy as np
import boto3

# to write data stream to S3
from io import StringIO 

# import variables with values about the secret, region, s3 bucket, sagemaker endpoint
# this file is generated during the creation of the SageMaker notebook instance
import cloudformation_values as cfvalues

Next, we set up some parameters we'll use in the rest of the notebook.

In [3]:
s3 = boto3.resource('s3')
# get the session information
session = boto3.Session()
# get the region
region = cfvalues.REGION

# S3 bucket was created during the launch of the CloudFormation stack
bucket_name = cfvalues.S3BUCKET
prefix = 'sagemaker/xgboost-churn'
source_data = 'source_churn_data.csv'
source_data_file_name = prefix + '/' + source_data
ml_data = 'aurora/churn_data'

# AWS Secrets stores our database credentials. 
db_secret_name = cfvalues.DBSECRET

Prepare the Amazon Aurora MySQL Database

We'll create some customer data in our Amazon Aurora database, for use during the rest of our scenario.

To do so, we'll take some publicly available "customer data", and load it into our database. We'll get the data from the Internet, write it out to S3, then load it into Aurora from S3. That will get us to the starting point of our scenario.

Here we are using administrative credentials for connecting to the database. The credentials were created during the database creation, and are stored in AWS Secrets Manager. We'll retrieve the secret, extract the credentials and the database endpoint name, and use them to connect to the database.

In [4]:
# Get the secret from AWS Secrets manager. Extract user, password, host.
from utilities import get_secret
get_secret_value_response = get_secret(db_secret_name, region)
creds = json.loads(get_secret_value_response['SecretString'])
db_user = creds['username']
db_password = creds['password']
# Writer endpoint
db_host = creds['host']
In [5]:
# create connection to the database
cnx = mysql.connect(user = db_user, 
                    password = db_password,
                    host = db_host)
In [6]:
# create cursor (allows traversal over the rows in the result set)
dbcursor = cnx.cursor()

Demonstrate the connection and functionality by showing the existing databases:

In [7]:
# send a query to show all existing databases and loop over the results to print:
dbcursor.execute("SHOW DATABASES")
for x in dbcursor:
  print(x)
('information_schema',)
('mysql',)
('performance_schema',)
('sys',)

To disconnect from the database:

In [8]:
cnx.close()

Download the Customer Churn Data

The dataset we use is publicly available and is mentioned in the book Discovering Knowledge in Data by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets. The content of each column in the data is described in another notebook here.

In [9]:
if not os.path.exists("DKD2e_data_sets.zip"):
    !wget http://dataminingconsultant.com/DKD2e_data_sets.zip
    !unzip -o DKD2e_data_sets.zip
else:
    print("File has been already downloaded")
File has been already downloaded
In [10]:
# read the customer churn data to pandas DataFrame
churn = pd.read_csv('./Data sets/churn.txt')
# review the top rows
churn.head()
Out[10]:
State Account Length Area Code Phone Int'l Plan VMail Plan VMail Message Day Mins Day Calls Day Charge ... Eve Calls Eve Charge Night Mins Night Calls Night Charge Intl Mins Intl Calls Intl Charge CustServ Calls Churn?
0 KS 128 415 382-4657 no yes 25 265.1 110 45.07 ... 99 16.78 244.7 91 11.01 10.0 3 2.70 1 False.
1 OH 107 415 371-7191 no yes 26 161.6 123 27.47 ... 103 16.62 254.4 103 11.45 13.7 3 3.70 1 False.
2 NJ 137 415 358-1921 no no 0 243.4 114 41.38 ... 110 10.30 162.6 104 7.32 12.2 5 3.29 0 False.
3 OH 84 408 375-9999 yes no 0 299.4 71 50.90 ... 88 5.26 196.9 89 8.86 6.6 7 1.78 2 False.
4 OK 75 415 330-6626 yes no 0 166.7 113 28.34 ... 122 12.61 186.9 121 8.41 10.1 3 2.73 3 False.

5 rows × 21 columns

In [11]:
# get number of rows and columns in the data
churn.shape
Out[11]:
(3333, 21)

We can see that the column names in this source data set have mixed case, spaces and special characters - all items that can easily cause grief in databases and when transferring data between formats and systems. To avoid these challenges, we'll simplify the column names before loading the data to Amazon Aurora.

In [12]:
new_columns = ["state", 
               "acc_length", 
               "area_code", 
               "phone",
               "int_plan",
               "vmail_plan",
               "vmail_msg",
               "day_mins", 
               "day_calls",
               "day_charge",
               "eve_mins",
               "eve_calls",
               "eve_charge",
               "night_mins",
               "night_calls",
               "night_charge", 
               "int_mins",
               "int_calls",
               "int_charge",
               "cust_service_calls",
               "churn"]
# create a dictionary where keys are the old column names and the values are the new column names
renaming_dict = dict(list(zip(list(churn.columns), new_columns)))
# rename the columns
churn = churn.rename(columns = renaming_dict)
In [13]:
churn.head()
Out[13]:
state acc_length area_code phone int_plan vmail_plan vmail_msg day_mins day_calls day_charge ... eve_calls eve_charge night_mins night_calls night_charge int_mins int_calls int_charge cust_service_calls churn
0 KS 128 415 382-4657 no yes 25 265.1 110 45.07 ... 99 16.78 244.7 91 11.01 10.0 3 2.70 1 False.
1 OH 107 415 371-7191 no yes 26 161.6 123 27.47 ... 103 16.62 254.4 103 11.45 13.7 3 3.70 1 False.
2 NJ 137 415 358-1921 no no 0 243.4 114 41.38 ... 110 10.30 162.6 104 7.32 12.2 5 3.29 0 False.
3 OH 84 408 375-9999 yes no 0 299.4 71 50.90 ... 88 5.26 196.9 89 8.86 6.6 7 1.78 2 False.
4 OK 75 415 330-6626 yes no 0 166.7 113 28.34 ... 122 12.61 186.9 121 8.41 10.1 3 2.73 3 False.

5 rows × 21 columns

The resulting data frame looks much better!

Now we'll write our sample data out to S3. We'll then bulk load the data from S3 directly into Amazon Aurora.

In [14]:
csv_buffer = StringIO()
churn.to_csv(csv_buffer, index = False)
s3.Object(bucket_name, source_data_file_name).put(Body = csv_buffer.getvalue())
print('s3://' + bucket_name + '/' + source_data_file_name)
s3://aurora-blog-4-111111111111-s3-bucket-for-model-training/sagemaker/xgboost-churn/source_churn_data.csv

Create Database, Table, Load Data in Amazon Aurora MySQL

Now, we want to create the target database and table in Amazon Aurora, so we can load the data.

In [15]:
database_name = "telecom_customer_churn"
churn_table = "customers"
customer_msgs_table = "customer_message"

Connect to the database server and create a cursor object to traverse over the fetched results.

In [16]:
cnx = mysql.connect(user = db_user, 
                    password = db_password,
                    host = db_host)

dbcursor = cnx.cursor(buffered = True)

Create a database:

In [17]:
# send a query to create a database
dbcursor.execute("CREATE DATABASE IF NOT EXISTS {}".format(database_name))
In [18]:
# send a query to show all existing databases and fetch all results:
dbcursor.execute("SHOW DATABASES")
databases = dbcursor.fetchall()
print(databases)
[('information_schema',), ('mysql',), ('performance_schema',), ('sys',), ('telecom_customer_churn',)]
In [19]:
# switch to the database 'telecom_customer_churn'
dbcursor.execute("USE {}".format(database_name))

Now we will create a table to hold customer churn data. The column definition was taken from this blog.

In [20]:
# here we delete the table 'customers' if it already exists
dbcursor.execute("DROP TABLE IF EXISTS {}".format(churn_table))
# then, we define a new table:
dbcursor.execute("""CREATE TABLE {}
               (state VARCHAR(2048), 
               acc_length BIGINT(20),
               area_code BIGINT(20),
               phone VARCHAR(2048),
               int_plan VARCHAR(2048),
               vmail_plan VARCHAR(2048),
               vmail_msg BIGINT(20),
               day_mins DOUBLE, 
               day_calls BIGINT(20),
               day_charge DOUBLE,
               eve_mins DOUBLE,
               eve_calls BIGINT(20),
               eve_charge DOUBLE,
               night_mins DOUBLE,
               night_calls BIGINT(20),
               night_charge DOUBLE, 
               int_mins DOUBLE,
               int_calls BIGINT(20),
               int_charge DOUBLE,
               cust_service_calls BIGINT(20),
               churn VARCHAR(2048))""".format(churn_table))
In [21]:
# send a query to show all existing tables
dbcursor.execute("SHOW TABLES")
# fetch all results
tables = dbcursor.fetchall()

# print names of the tables in the database 'telecom_customer_churn'
for table in tables:
    print(table)
('customers',)

Here we will print the list of columns that will be updated when inserting the data from the data frame.

In [22]:
# send a query to retrieve the column names from the table 'customers' and fetch the results
dbcursor.execute("SHOW COLUMNS FROM {}".format(churn_table))
columns = dbcursor.fetchall()
cols = "','".join([x[0] for x in columns])

# print the column names as a comma-separate string created in a previous statement.
print("'" + cols + "'")
'state','acc_length','area_code','phone','int_plan','vmail_plan','vmail_msg','day_mins','day_calls','day_charge','eve_mins','eve_calls','eve_charge','night_mins','night_calls','night_charge','int_mins','int_calls','int_charge','cust_service_calls','churn'

Everything looks good so far! Now we're ready to bulk load our data into Amazon Aurora from S3.

In [23]:
print(source_data_file_name)
sagemaker/xgboost-churn/source_churn_data.csv
In [24]:
# send a query to load the data into the table 'customers' from the S3 bucket.
dbcursor.execute("""LOAD DATA FROM S3 's3://{bucket}/{filename}' INTO TABLE {tablename} 
                 FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n' IGNORE 1 LINES""".format(tablename = database_name + '.' + churn_table, 
                                                      bucket = bucket_name, 
                                                      filename = source_data_file_name))
In [25]:
# commit the above transaction for all users
cnx.commit()

Let's check for load errors, and check whether the resulting data looks correct. The output provides us with the name of the source s3 bucket from which the data were loaded, file name and when it was loaded.

In [26]:
# run a query to check the history of all data loads from S3 to the database
dbcursor.execute("SELECT * from mysql.aurora_s3_load_history WHERE load_prefix = 's3://{bucket}/{filename}'".format(
    tablename = churn_table,
    bucket = bucket_name,
    filename = source_data_file_name))
all_loads = dbcursor.fetchall()
for load in all_loads:
    print(load)
('s3://aurora-blog-4-111111111111-s3-bucket-for-model-training/sagemaker/xgboost-churn/source_churn_data.csv', 'sagemaker/xgboost-churn/source_churn_data.csv', '', 313211, datetime.datetime(2020, 4, 10, 22, 29, 52))
In [27]:
# run a query to preview the first 5 rows from the table:
dbcursor.execute("SELECT * FROM `{}` LIMIT 5".format(churn_table))
result = dbcursor.fetchall()
for i in result:
    print(i)
('KS', 128, 415, '382-4657', 'no', 'yes', 25, 265.1, 110, 45.07, 197.4, 99, 16.78, 244.7, 91, 11.01, 10.0, 3, 2.7, 1, 'False.')
('OH', 107, 415, '371-7191', 'no', 'yes', 26, 161.6, 123, 27.47, 195.5, 103, 16.62, 254.4, 103, 11.45, 13.7, 3, 3.7, 1, 'False.')
('NJ', 137, 415, '358-1921', 'no', 'no', 0, 243.4, 114, 41.38, 121.2, 110, 10.3, 162.6, 104, 7.32, 12.2, 5, 3.29, 0, 'False.')
('OH', 84, 408, '375-9999', 'yes', 'no', 0, 299.4, 71, 50.9, 61.9, 88, 5.26, 196.9, 89, 8.86, 6.6, 7, 1.78, 2, 'False.')
('OK', 75, 415, '330-6626', 'yes', 'no', 0, 166.7, 113, 28.34, 148.3, 122, 12.61, 186.9, 121, 8.41, 10.1, 3, 2.73, 3, 'False.')

We can see that the customer data is now in Aurora.

Load Customer Messages to Database

Now we'll create a second table, one with some messages from customer service calls. We'll use this table later, to test Amazon Comprehend integration with our database.

In [28]:
# here we are reusing the same cursor object created above and removing the table 'customer_message'
# if it already exists.
dbcursor.execute("DROP TABLE IF EXISTS `{}`".format(customer_msgs_table))
# next, we define a table with four colums: area code, phone number, text of a message from a customer and
# the time they called.
sql = """CREATE TABLE IF NOT EXISTS `{}` (
       area_code BIGINT(20) NOT NULL,
       phone VARCHAR(2048) NOT NULL,
       message VARCHAR(255) NOT NULL,
       calltime TIMESTAMP NOT NULL
);""".format(customer_msgs_table)
dbcursor.execute(sql)
In [29]:
# verify that the table was successfully created by showing all existing tables 
# in the database 'telecom_customer_churn'
dbcursor.execute("SHOW TABLES")
tables = dbcursor.fetchall()
In [30]:
for table in tables:
    print(table)
('customer_message',)
('customers',)
In [31]:
# here we request to see the format of the columns in the table 'customer_message'
dbcursor.execute("DESCRIBE `{}`;".format(customer_msgs_table))
dbcursor.fetchall()
Out[31]:
[('area_code', 'bigint(20)', 'NO', '', None, ''),
 ('phone', 'varchar(2048)', 'NO', '', None, ''),
 ('message', 'varchar(255)', 'NO', '', None, ''),
 ('calltime', 'timestamp', 'NO', '', None, '')]
In [32]:
# the following SQL statement loads 6 rows in the table 'customer_message' with the area code,
# phone number, generated messages and the date/time of the call.
sql_inserts =["""
    INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (415, "329-6603", "Thank you very much for resolving the issues with my bill!", '2020-01-01 10:10:10');""",
    """INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (415, "351-7269", "I don't understand how I paid for 100 minutes and got only 90, you are ripping me off!",'2020-01-01 10:10:10');""",
    """INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (408, "360-1596", "Please fix this issue! I am sick of sitting on a phone every single day with you people!",'2020-01-01 10:10:10');""",
    """INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (415, "382-4657", "This is a really great feature, thank for helping me store all my phone numbers.", '2020-01-01 10:10:10');""",
    """INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (415, "371-7191", "Why am I paying so much for my international minutes?", '2020-01-01 10:10:10');""",
    """INSERT INTO customer_message(area_code, phone, message, calltime)
    VALUES (415, "358-1921", "Why do I have to wait for the response from the customer service for so long? I don't have time for this.", '2020-01-01 10:10:10');"""
             ]

try:
    for i in range(len(sql_inserts)):
        dbcursor.execute(sql_inserts[i])
    # NB : you won't get an IntegrityError when reading
except (MySQLdb.Error, MySQLdb.Warning) as e:
    print(e)
In [33]:
cnx.commit()

Lastly, let's join the tables and read them to pandas DataFrame to check that we can see customer and complaint data as expected.

In [34]:
sql = """SELECT cu.state, cu.area_code, cu.phone, cu.int_plan, cu.vmail_plan, cu.churn, 
        calls.message 
        FROM {} cu, {} calls
        WHERE cu.area_code = calls.area_code AND cu.phone = calls.phone
        AND message is not null""".format(churn_table, customer_msgs_table)

df = pd.read_sql(sql, con = cnx)
df.head(5)
Out[34]:
state area_code phone int_plan vmail_plan churn message
0 KS 415 382-4657 no yes False. This is a really great feature, thank for help...
1 OH 415 371-7191 no yes False. Why am I paying so much for my international m...
2 NJ 415 358-1921 no no False. Why do I have to wait for the response from th...
3 IN 415 329-6603 no no True. Thank you very much for resolving the issues w...
4 NY 415 351-7269 no no True. I don't understand how I paid for 100 minutes ...
In [35]:
# close connection to the database
cnx.close()

Our Amazon Aurora database now contains our "production" data. Now we're finally at the starting point of our scenario!

Section 2. Export data from Amazon Aurora to S3 for use in Machine Learning

The DBA has just received the request: "Please export the customer data to S3, so the data scientist can explore the reason for data churn. Thanks!"

Luckily, there's a new Amazon Aurora feature that makes it easy: Saving Data from an Amazon Aurora MySQL DB Cluster into Text Files in an Amazon S3 Bucket. We'll use this feature to export our customer data to S3.

In [36]:
# create connection to the database
cnx = mysql.connect(user = db_user, 
                    password = db_password,
                    host = db_host)

# create cursor
dbcursor = cnx.cursor(buffered = True)
dbcursor.execute("USE {}".format(database_name))

We can also split the data into test, training, validation and upload to s3 separately directly from our database. But for now, we'll let the data scientists deal with that!

One of the requirements for performing queries against the Amazon SageMaker endpoint (which will be created shortly) is SQL privileges to invoke Amazon SageMaker and to execute functions, which is described in the documentation here, section "Granting SQL Privileges for Invoking Aurora Machine Learning Services".

Let's check that we have the right privileges. We should see 'SELECT INTO S3' listed. We also need to have the right privileges. Since we are using admin as a user to invoke queries, this step isn't needed. However, in normal circumstances, a SQL user should have these privileges granted.

In [37]:
# this statement displays the privileges and roles that are assigned to a MySQL user account or role
dbcursor.execute("SHOW GRANTS")
dbcursor.fetchall()
Out[37]:
[("GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER, LOAD FROM S3, SELECT INTO S3, INVOKE LAMBDA, INVOKE SAGEMAKER, INVOKE COMPREHEND ON *.* TO 'admin'@'%' WITH GRANT OPTION",)]

There are several ways we can unload the data. We could choose to unload without using headers; this would be appropriate if we're unloading a large amount of data and are using a metadata catalog (such as AWS Glue) to store the column information.

Here, as it's a small amount of data, to simplify the use case and to avoid introducing errors, we choose to unload the data in CSV format and add a header for use by the data engineer in Part 2. Otherwise, we could also provide the data engineer with the column list.

In [38]:
dbcursor.execute("""SELECT * FROM `{tablename}` INTO OUTFILE S3 's3://{bucket}/{prefix}/{mldata}' 
    FORMAT CSV HEADER""".format(tablename = churn_table,
                                bucket = bucket_name,
                                prefix = prefix,
                                mldata = ml_data))
In [39]:
dbcursor.execute("SHOW COLUMNS FROM {}".format(churn_table))
columns = dbcursor.fetchall()
cols = "','".join([x[0] for x in columns])

print("'" + cols + "'")
'state','acc_length','area_code','phone','int_plan','vmail_plan','vmail_msg','day_mins','day_calls','day_charge','eve_mins','eve_calls','eve_charge','night_mins','night_calls','night_charge','int_mins','int_calls','int_charge','cust_service_calls','churn'

Now we have the column names in case we need to pass this information to the data engineer so he knows what the columns are in our unloaded data.

In [40]:
cnx.close()

The DBA is done for now! The customer data has been unloaded into S3, and is ready for model training.

Move on to the next notebook, "Part 2", which covers training the machine learning model on this data.