Overview

This walk through demonstrates an exemplary use case of SQL Server 2016 R services. We will see how the customer and campaign data in the SQL server can be accessed from your favorite R IDE to do advanced analytics, build models and write back prediction back to the SQL server all from the R client. Prerequsities to follow this post are familiarity with R, RevoScaleR package and T-SQL.

Introduction

As businesses are starting to acknowledge the power of data, leveraging machine learning techniques to grow has become a must. In particular, customer-oriented businesses can learn patterns from their data to intelligently design acquisition campaigns and convert the highest possible number of customers. Alternatively, businesses can retarget the customers based on their past interaction patterns.

Among the key variables to learn from data are the best communication channel (e.g. SMS, Email, Call), the day of the week and the time of the day through which/ during which a given potential customer is targeted by a marketing campaign. This template provides a customer-oriented business with an analytics tool that helps determine the best combination of these three variables for each customer, based (among others) on financial and demographic data.

This notebook takes advantage of the power of SQL Server and RevoScaleR (Microsoft R Server). The tables are all stored in a SQL Server, and most of the computations are done without loading data in-memory. All the data is pre-simulated using R.

It does the following:

Step 0: Packages and Compute Contexts

Load all relevant packages that will be used through the course of this experiment.

# Load packages.
library(RevoScaleR)
library(RODBC)
library(ggplot2)
library(reshape)
library(scales)

Read the 4 tables from the disk in to the memory and then write them to local instance of SQL server.

file_path <- "C:/R/r-server-campaign-optimization/Data"

table_Campaign_Detail <- read.csv(file.path(file_path, "Campaign_Detail.csv")) 

table_Lead_Demography <- read.csv(file.path(file_path, "Lead_Demography.csv")) 

table_Market_Touchdown <- read.csv(file.path(file_path, "Market_Touchdown.csv")) 

table_Product <- read.csv(file.path(file_path, "Product.csv")) 

We create 2 compute contexts local and sql and use them interchangeably based on the context. Compute contexts is introduced as a part of the Microsoft R Server using which the computation can be taken to the data rather than moving the data to local environment to perform analytics.

connection_string <- "Driver=SQL Server;Server=DESKTOP-92V6LG1\\HOMESERVER;Database=Campaign;UID=;PWD="

sql <- RxInSqlServer(connectionString = connection_string)
local <- RxLocalSeq()

# Set the Compute Context to Local, to load files in-memory.
rxSetComputeContext(local)

Upload the 4 tables to SQL Server

Using the RxSqlServerData we can point a local R object either to table or view in the SQL Server. The table or view can either existing or yet to be created. Then we use the rxDataStep function to actually perform the action of loading the local R data.frame to the SQL server. With the help of these 2 functions we will load our 4 tables to the SQL Server.

Campaign_Detail <- RxSqlServerData(table = "Campaign_Detail", connectionString = connection_string)
rxDataStep(inData = table_Campaign_Detail, outFile = Campaign_Detail, overwrite = TRUE)
## Rows Read: 6, Total Rows Processed: 6
## Total Rows written: 6, Total time: 0.001
## , Total Chunk Time: 0.029 seconds
Lead_Demography <- RxSqlServerData(table = "Lead_Demography", connectionString = connection_string)
rxDataStep(inData = table_Lead_Demography, outFile = Lead_Demography, overwrite = TRUE)
## Rows Read: 100000, Total Rows Processed: 100000
## Total Rows written: 100000, Total time: 1.896
## , Total Chunk Time: 2.045 seconds
Market_Touchdown <- RxSqlServerData(table = "Market_Touchdown", connectionString = connection_string)
rxDataStep(inData = table_Market_Touchdown, outFile = Market_Touchdown, overwrite = TRUE)
## Rows Read: 500263, Total Rows Processed: 500263
## Total Rows written: 100000, Total time: 1.567
## Total Rows written: 200000, Total time: 3.153
## Total Rows written: 300000, Total time: 4.768
## Total Rows written: 400000, Total time: 6.355
## Total Rows written: 500000, Total time: 7.934
## Total Rows written: 500263, Total time: 7.956
## , Total Chunk Time: 8.080 seconds
Product <- RxSqlServerData(table = "Product",connectionString = connection_string)
rxDataStep(inData = table_Product, outFile = Product, overwrite = TRUE)
## Rows Read: 6, Total Rows Processed: 6
## Total Rows written: 6, Total time: 0.002
## , Total Chunk Time: 0.020 seconds

Step 1: Pre-Processing and Cleaning

Merging the tables

In this step we will merge all 4 the tables to create multiple features for every customer. To run the queries in the SQL server from local R client, we first need to set up an ODBC connection using the RxOdbcData and rxOpen functions. Once the connection is established we can run the sql queries in the server using the rxExecuteSQLDDL.

THe below chunk will merge the Campaign_Detail and Product in to Campaign_Product table. This is a new physical table in the database.

# Merge the 4 data sets into one with 3 inner joins on key variables.

## Open a connection with SQL Server to be able to write queries with the rxExecuteSQLDDL function.
outOdbcDS <- RxOdbcData(table = "xyz", connectionString = connection_string, useFastRead=TRUE)
rxOpen(outOdbcDS, "w")
## [1] TRUE
## Inner join of the tables Product and Campaign_Detail
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Campaign_Product;"
, sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
  "SELECT Campaign_Detail.*, Term , No_Of_People_Covered, 
          Payment_Frequency, Net_Amt_Insured, Amt_On_Maturity_Bin,
          Product, Premium
  INTO Campaign_Product
  FROM Campaign_Detail JOIN Product
  ON Product.Product_Id = Campaign_Detail.Product_Id;"
  , sep=""))
## [1] TRUE

We will create another new physical table Market_Lead by merging Market_Touchdown and Lead_Demography using the below chunk.

## Inner join of the tables Market_Touchdown and Lead_Demography
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Market_Lead;"
, sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
"SELECT Lead_Demography.Lead_Id, Age, Phone_No, Annual_Income_Bucket, Credit_Score, Country, State,
        No_Of_Dependents, Highest_Education, Ethnicity,
        No_Of_Children, Household_Size, Gender, 
        Marital_Status, Channel, Time_Of_Day, Conversion_Flag, Campaign_Id, Day_Of_Week, Comm_Id, Time_Stamp
 INTO Market_Lead
 FROM Market_Touchdown JOIN Lead_Demography
 ON Market_Touchdown.Lead_Id = Lead_Demography.Lead_Id;"
, sep=""))
## [1] TRUE

However, we will use a slighly different approach to merge the above 2 newly created tables Campaign_Product and Market_Lead. Instead of creating a physical table, we will create an object that points to the virtually created merge. This is referred to as creating the table on the fly.

## Point to an inner join of the two previous tables. This table will not be materialized. It is created on the fly when removing NAs. 
## Numeric variables are converted to characters only to get their mode for NA cleaning. 

Merged_sql <- RxSqlServerData(  
  sqlQuery = 
"SELECT Lead_Id, Age, Phone_No, Annual_Income_Bucket, Credit_Score, Country, State,
        CAST(No_Of_Dependents AS char(1)) AS No_Of_Dependents, Highest_Education, Ethnicity,
        CAST(No_Of_Children AS char(1)) AS No_Of_Children, CAST(Household_Size AS char(1)) AS Household_Size, Gender, 
        Marital_Status, Channel, Time_Of_Day, Conversion_Flag, Market_Lead.Campaign_Id, Day_Of_Week, Comm_Id, Time_Stamp,
        Product, Category, Term, CAST(No_Of_People_Covered AS char(1)) AS No_Of_People_Covered,
        CAST(Premium AS varchar(4)) AS Premium, Payment_Frequency,
        Amt_On_Maturity_Bin, Sub_Category, Campaign_Drivers, Campaign_Name, Launch_Date, Call_For_Action, 
        Focused_Geography, Tenure_Of_Campaign, CAST(Net_Amt_Insured AS varchar(7)) AS Net_Amt_Insured , Product_Id
 FROM Campaign_Product JOIN Market_Lead 
 ON Campaign_Product.Campaign_Id = Market_Lead.Campaign_Id "
  ,connectionString = connection_string, stringsAsFactors = TRUE)

print("Datasets merged")
## [1] "Datasets merged"

Clean the Merged data set: replace NAs with the mode

This is the data clearning step where we will replace NAs in each column with the mode of that particular column. This is just one of many strategies to impute missing values.

Firstly, we will assume there are no missing value in some of the columns like unique identifiers and other customer specific information. Using the rxGetVarInfo we can extract the column names of the final merged table. Remember this table is physically not present in the SQL server.

## Assumption: 
## no NAs in the Id variables (Lead_Id, Product_Id, Campaign_Id, Comm_Id) and in (Phone_No, Launch_Date, Time_Stamp).

## Find the variables that have missing values (NA). 
colnames <- names(rxGetVarInfo(Merged_sql))
colnames
##  [1] "Lead_Id"              "Age"                  "Phone_No"            
##  [4] "Annual_Income_Bucket" "Credit_Score"         "Country"             
##  [7] "State"                "No_Of_Dependents"     "Highest_Education"   
## [10] "Ethnicity"            "No_Of_Children"       "Household_Size"      
## [13] "Gender"               "Marital_Status"       "Channel"             
## [16] "Time_Of_Day"          "Conversion_Flag"      "Campaign_Id"         
## [19] "Day_Of_Week"          "Comm_Id"              "Time_Stamp"          
## [22] "Product"              "Category"             "Term"                
## [25] "No_Of_People_Covered" "Premium"              "Payment_Frequency"   
## [28] "Amt_On_Maturity_Bin"  "Sub_Category"         "Campaign_Drivers"    
## [31] "Campaign_Name"        "Launch_Date"          "Call_For_Action"     
## [34] "Focused_Geography"    "Tenure_Of_Campaign"   "Net_Amt_Insured"     
## [37] "Product_Id"

Now we will exclude the column names as per our assumption and create a formula object for the rxSummary function.

var <- colnames[!colnames %in% c("Lead_Id", "Product_Id", "Campaign_Id", "Comm_Id", "Phone_No", "Launch_Date", "Time_Stamp")]

formula <- as.formula(paste("~", paste(var, collapse = "+")))

formula
## ~Age + Annual_Income_Bucket + Credit_Score + Country + State + 
##     No_Of_Dependents + Highest_Education + Ethnicity + No_Of_Children + 
##     Household_Size + Gender + Marital_Status + Channel + Time_Of_Day + 
##     Conversion_Flag + Day_Of_Week + Product + Category + Term + 
##     No_Of_People_Covered + Premium + Payment_Frequency + Amt_On_Maturity_Bin + 
##     Sub_Category + Campaign_Drivers + Campaign_Name + Call_For_Action + 
##     Focused_Geography + Tenure_Of_Campaign + Net_Amt_Insured

Now that we have the formula, we can use it in the rxSummary function to identify which of the columns have NAs. The $sDataFrame object in the summary gives us this information as can be seen from below.

summary <- rxSummary(formula, Merged_sql, byTerm = TRUE)
## Rows Read: 50000, Total Rows Processed: 50000, Total Chunk Time: 1.561 seconds
## Rows Read: 50000, Total Rows Processed: 100000, Total Chunk Time: 1.493 seconds
## Rows Read: 50000, Total Rows Processed: 150000, Total Chunk Time: 1.422 seconds
## Rows Read: 50000, Total Rows Processed: 200000, Total Chunk Time: 1.420 seconds
## Rows Read: 50000, Total Rows Processed: 250000, Total Chunk Time: 1.416 seconds
## Rows Read: 50000, Total Rows Processed: 300000, Total Chunk Time: 1.420 seconds
## Rows Read: 50000, Total Rows Processed: 350000, Total Chunk Time: 1.421 seconds
## Rows Read: 50000, Total Rows Processed: 400000, Total Chunk Time: 1.424 seconds
## Rows Read: 50000, Total Rows Processed: 450000, Total Chunk Time: 1.447 seconds
## Rows Read: 50000, Total Rows Processed: 500000, Total Chunk Time: 1.429 seconds
## Rows Read: 263, Total Rows Processed: 500263, Total Chunk Time: 0.084 seconds 
## 
## Elapsed time to compute low/high values and/or factor levels: 14.547 secs.
##  
## Rows Read: 50000, Total Rows Processed: 50000, Total Chunk Time: 1.446 seconds
## Rows Read: 50000, Total Rows Processed: 100000, Total Chunk Time: 1.522 seconds
## Rows Read: 50000, Total Rows Processed: 150000, Total Chunk Time: 1.465 seconds
## Rows Read: 50000, Total Rows Processed: 200000, Total Chunk Time: 1.552 seconds
## Rows Read: 50000, Total Rows Processed: 250000, Total Chunk Time: 1.548 seconds
## Rows Read: 50000, Total Rows Processed: 300000, Total Chunk Time: 1.555 seconds
## Rows Read: 50000, Total Rows Processed: 350000, Total Chunk Time: 1.529 seconds
## Rows Read: 50000, Total Rows Processed: 400000, Total Chunk Time: 1.446 seconds
## Rows Read: 50000, Total Rows Processed: 450000, Total Chunk Time: 1.451 seconds
## Rows Read: 50000, Total Rows Processed: 500000, Total Chunk Time: 1.442 seconds
## Rows Read: 263, Total Rows Processed: 500263, Total Chunk Time: 0.087 seconds 
## Computation time: 15.072 seconds.
head(summary$sDataFrame)
##                   Name Mean StdDev Min Max ValidObs MissingObs
## 1                  Age   NA     NA  NA  NA   500263          0
## 2 Annual_Income_Bucket   NA     NA  NA  NA   500263          0
## 3         Credit_Score   NA     NA  NA  NA   500263          0
## 4              Country   NA     NA  NA  NA   500263          0
## 5                State   NA     NA  NA  NA   500263          0
## 6     No_Of_Dependents   NA     NA  NA  NA   495430       4833

Using the MissingObs columns we will extract the columns with NAs.

var_with_NA <- summary$sDataFrame[summary$sDataFrame$MissingObs > 0, 1] 
var_number_with_NA <- which(summary$sDataFrame$MissingObs > 0) 
var_number_with_NA
## [1]  6  7  9 10

Compute the mode of variables with missing values

Now we can easily compute the modes of the columsn with missing NAs using the chunk below.

mode <- c()
k <- 0
for(n in var_number_with_NA ){
  k <- k + 1
  mode[k] <- as.character(summary$categorical[[n]][which.max(summary$categorical[[n]][,2]),1])
}

mode
## [1] "0"               "Graduate School" "0"               "1"

Point again to the merged table without stringsAsFactors = TRUE and with correct variable types

We will create another pointer to the virtual table but this time without converting the numerics to charaters. We will use this to impute the missing values with modes.

Merged_sql2 <- RxSqlServerData(  
  sqlQuery = 
"SELECT Market_Lead.*, Product, Category, Term, No_Of_People_Covered, Premium, Payment_Frequency,
        Amt_On_Maturity_Bin, Sub_Category, Campaign_Drivers, Campaign_Name, Launch_Date, Call_For_Action, 
        Focused_Geography, Tenure_Of_Campaign, Net_Amt_Insured, Product_Id
 FROM Campaign_Product JOIN Market_Lead
 ON Campaign_Product.Campaign_Id = Market_Lead.Campaign_Id "
  ,connectionString = connection_string)

Function to deal with NAs

The simple function below will replace the missing values with modes of the respective columns.

Mode_Replace <- function(data) {
  data <- data.frame(data)
  for(j in 1:length(var_with_NA_1)){
    row_na <- which(is.na(data[,var_with_NA_1[j]]) == TRUE) 
        if (var_with_NA_1[j] %in% c("No_Of_Dependents", "No_Of_Children", "Household_Size", "No_Of_People_Covered", "Premium", "Net_Amt_Insured")){
          data[row_na,var_with_NA_1[j]] <- as.integer(mode_1[j])
        } else{
          data[row_na,var_with_NA_1[j]] <- mode_1[j]
        }
  }
  return(data)
}

Create the CM_AD0 table by dealing with NAs in Merged_sql and save it to a SQL table

Now this is where the rxDataStep function shines. The function takes the following inputs:

inData = Merged_sql2, this is the virtual table with NAs outFile = CM_AD), this is an R object that points to a new table in the SQL server. We want to dump the clean data to this table transformFunc = Mode_Replace, this is the function that take the inData and transforms missing values to modes and writes to outFile transformObjects = These are the local R objects that are used in the transformFunc

CM_AD0 <- RxSqlServerData(table = "CM_AD0", connectionString = connection_string)
rxDataStep(inData = Merged_sql2 , outFile = CM_AD0, overwrite = TRUE, transformFunc = Mode_Replace, 
           transformObjects = list(var_with_NA_1 = var_with_NA, mode_1 = mode))
## Total Rows written: 50000, Total time: 1.613
## Rows Read: 50000, Total Rows Processed: 50000, Total Chunk Time: 3.471 secondsTotal Rows written: 50000, Total time: 1.723
## Rows Read: 50000, Total Rows Processed: 100000, Total Chunk Time: 3.481 secondsTotal Rows written: 50000, Total time: 1.606
## Rows Read: 50000, Total Rows Processed: 150000, Total Chunk Time: 3.449 secondsTotal Rows written: 50000, Total time: 1.779
## Rows Read: 50000, Total Rows Processed: 200000, Total Chunk Time: 3.523 secondsTotal Rows written: 50000, Total time: 1.655
## Rows Read: 50000, Total Rows Processed: 250000, Total Chunk Time: 3.447 secondsTotal Rows written: 50000, Total time: 1.708
## Rows Read: 50000, Total Rows Processed: 300000, Total Chunk Time: 3.420 secondsTotal Rows written: 50000, Total time: 1.614
## Rows Read: 50000, Total Rows Processed: 350000, Total Chunk Time: 3.345 secondsTotal Rows written: 50000, Total time: 1.785
## Rows Read: 50000, Total Rows Processed: 400000, Total Chunk Time: 3.650 secondsTotal Rows written: 50000, Total time: 1.77
## Rows Read: 50000, Total Rows Processed: 450000, Total Chunk Time: 3.509 secondsTotal Rows written: 50000, Total time: 1.697
## Rows Read: 50000, Total Rows Processed: 500000, Total Chunk Time: 3.547 secondsTotal Rows written: 263, Total time: 0.009
## Rows Read: 263, Total Rows Processed: 500263, Total Chunk Time: 0.123 seconds

Lets validate if all the NAs are replaced. AS we can see from the table below the MissingObs is 0 for all columns.

summary1 <- rxSummary(formula, CM_AD0, byTerm = TRUE)
## Rows Read: 50000, Total Rows Processed: 50000, Total Chunk Time: 1.273 seconds
## Rows Read: 50000, Total Rows Processed: 100000, Total Chunk Time: 1.271 seconds
## Rows Read: 50000, Total Rows Processed: 150000, Total Chunk Time: 1.258 seconds
## Rows Read: 50000, Total Rows Processed: 200000, Total Chunk Time: 1.257 seconds
## Rows Read: 50000, Total Rows Processed: 250000, Total Chunk Time: 1.297 seconds
## Rows Read: 50000, Total Rows Processed: 300000, Total Chunk Time: 1.330 seconds
## Rows Read: 50000, Total Rows Processed: 350000, Total Chunk Time: 1.344 seconds
## Rows Read: 50000, Total Rows Processed: 400000, Total Chunk Time: 1.309 seconds
## Rows Read: 50000, Total Rows Processed: 450000, Total Chunk Time: 1.334 seconds
## Rows Read: 50000, Total Rows Processed: 500000, Total Chunk Time: 1.269 seconds
## Rows Read: 263, Total Rows Processed: 500263, Total Chunk Time: 0.061 seconds 
## Computation time: 13.069 seconds.
library(dplyr)
summary1$sDataFrame$MissingObs %>% sum()
## [1] 0

We can now drop the 2 intermediate tables.

## Drop intermediate tables.
#rxSqlServerDropTable(table = "Campaign_Product", connectionString = connection_string)
#rxSqlServerDropTable(table = "Market_Lead", connectionString = connection_string)

print("Data cleaned")
## [1] "Data cleaned"

Step 2: Feature Engineering

In this step, we:

  1. Create SMS_Count, Email_Count, and Call_Count: number of communications through each channel.

  2. Create Previous_Channel: the previous channel used towards the Lead_Id in the campaign.

  3. Aggregate the data by Lead_Id, keeping the latest campaign activity each Lead_Id received.

Input: Data set with all the campaign activities received by each Lead_Id, CM_AD0.

Output: Data set with new features and the latest campaign activity each Lead_Id received, CM_AD.

SMS_Count, Email_Count, and Call_Count

We will again use the rxExecuteSQLDDL to compute the counts using the aggregate functions.

# Determine how many times each Lead_Id was contacted through SMS, Email and Call

rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Intermediate;"
, sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
"SELECT Lead_Id, 
        coalesce(count(case when Channel = 'SMS' then 1 end), 0) as SMS_Count,
        coalesce(count(case when Channel = 'Cold Calling' then 1 end), 0) as Call_Count,
        coalesce(count(case when Channel = 'Email' then 1 end), 0) as Email_Count
 INTO Intermediate
 FROM CM_AD0
 GROUP BY Lead_Id;"
, sep=""))
## [1] TRUE
print("Counts computed")
## [1] "Counts computed"

We can see the tope few rows of the Intermediate table to confirm the execution was successful.

display_head <- function(table_name, n_rows){
   table_sql <- RxSqlServerData(sqlQuery = sprintf("SELECT TOP(%s) * FROM %s", n_rows, table_name), connectionString = connection_string)
   table <- rxImport(table_sql)
   print(table)
}

#display_head("Intermediate", 10)

Previous_Channel and aggregating

We will now compute the previous channels for every lead along with the latest channel using the window function LAG. Also, merge the counts of SMS, Email and Calls in to another intermediate table.

# Determine the previous channel used towards every Lead_Id for every campaign activity 
# The first record for each Lead_Id will be disregarded. 

## Create a lag variable corresponding to the previous channel, while performing an inner join to append the Counts. 
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Intermediate2;", sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
"SELECT CM_AD0.*, Intermediate.SMS_Count, Intermediate.Email_Count, Intermediate.Call_Count, 
        LAG(Channel, 1,0) OVER (Partition by CM_AD0.Lead_Id ORDER BY CM_AD0.Lead_Id, Comm_Id ASC) AS Previous_Channel,
        ROW_NUMBER() OVER (PARTITION BY CM_AD0.Lead_Id ORDER BY CM_AD0.Comm_Id DESC) AS Row
 INTO Intermediate2
 FROM Intermediate JOIN CM_AD0 
 ON Intermediate.Lead_Id = CM_AD0.Lead_Id ;"
, sep=""))
## [1] TRUE
print("Previous_Channel computed")
## [1] "Previous_Channel computed"

CM_AD is the data set that will be used for modeling

We will now create a new table CM_AD from Intermediate2. Before doing this we will define the data types and constraints in the SQL server so that there is coherence between R & sQL.

# In order to ensure coherence between the SQL SP code and the R code, we specify here the types of the variables in CM_AD. 
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists CM_AD;", sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("CREATE TABLE CM_AD
(
  Lead_Id varchar(15) NOT NULL Primary Key
  ,Age varchar(30)
  ,Phone_No varchar(50)
  ,Annual_Income_Bucket varchar(15)
  ,Credit_Score  varchar(15)
  ,Country varchar(5)
  ,[State] char(2)
  ,No_Of_Dependents int
  ,Highest_Education varchar(30) 
  ,Ethnicity varchar(20)
  ,No_Of_Children int 
  ,Household_Size int 
  ,Gender char(1)
  ,Marital_Status char(1)
  ,Channel varchar(15)
  ,Time_Of_Day varchar(15)
  ,Conversion_Flag char(1)
  ,Campaign_Id char(1)
  ,Day_Of_Week char(1)
  ,Comm_Id char(1)
  ,Time_Stamp date
  ,Product varchar(50)
  ,Category varchar(15)
  ,Term char(2)
  ,No_Of_People_Covered int
  ,Premium int 
  ,Payment_Frequency varchar(50)
  ,Amt_On_Maturity_Bin varchar(50)
  ,Sub_Category varchar(15)
  ,Campaign_Drivers varchar(50)
  ,Campaign_Name varchar(50)
  ,Launch_Date date
  ,Call_For_Action char(1)
  ,Focused_Geography varchar(15)
  ,Tenure_Of_Campaign char(1)
  ,Net_Amt_Insured int
  ,Product_Id char(1)
  ,SMS_Count int
  ,Email_Count int
  ,Call_Count int 
  ,Previous_Channel varchar(15)
  ,[Row] int
);"
, sep=""))
## [1] TRUE

Now that the table is create we will insert the contents of Intermediate 2 in to CM_AD but only the rows related to the latest communication. Then remove the row column which is used for the filtering.

# Keeping the last record for each Lead_Id. 
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
"INSERT INTO CM_AD
 SELECT *  
 FROM Intermediate2
 WHERE Row = 1  ;"
  , sep=""))
## [1] TRUE
# Removing the Row number variables. 
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("ALTER TABLE CM_AD DROP COLUMN Row;", sep=""))
## [1] TRUE
print("Aggregation done")
## [1] "Aggregation done"

Finally drop the intermediate tables.

# Drop intermediate tables. 
#rxSqlServerDropTable(table = "Intermediate", connectionString = connection_string)
#rxSqlServerDropTable(table = "Intermediate2", connectionString = connection_string)

Step 3: Training and Evaluating the Models

In this step we:

  1. Split CM_AD into a Training CM_AD_Train, and a Testing set CM_AD_Test.

  2. Train Random Forest (RF) and Gradient Boosting Trees (GBT) on CM_AD_Train, and save them to SQL.

  3. Score RF and GBT on CM_AD_Test.

  4. Select the best model based on AUC.

Input: Data set CM_AD.

Output: Random forest and GBT models saved to SQL. One of them is chosen based on AUC.

Point to the SQL table with the data set for modeling. Strings will be converted to factors.

CM_AD <- RxSqlServerData(table = "CM_AD", connectionString = connection_string, stringsAsFactors = T)

Get the column info from the CM_AD which will be used later on.

# Get variable names, types, and levels for factors.
column_info <- rxCreateColInfo(CM_AD)
#column_info
print("Column information received")
## [1] "Column information received"

We will then randomly select about 70% of the data and split it in to train and test data sets.

# Randomly split the data into a training set and a testing set, with a splitting % p.
# p % goes to the training set, and the rest goes to the testing set. Default is 70%. 

p <- 70 

## Create the Train_Id table containing Lead_Id of training set. 
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Train_Id;", sep=""))
## [1] TRUE

Create an R object to point to all the Lead IDs and another one to a new table called Train_Id. This new table will contail all Lead Ids of training data set.

Train_Id_R <- RxSqlServerData(sqlQuery = "SELECT Lead_ID FROM CM_AD", connectionString = connection_string)
Train_Id <- RxSqlServerData(table = "Train_Id", connectionString = connection_string)

rxDataStep(Train_Id_R, 
           Train_Id, 
           rowSelection = selVar, 
           transforms = list (
                   selVar = as.logical(rbinom(.rxNumRows,1,p1/100))
                   ), 
          transformObjects = list(p1 = p), 
           overwrite = TRUE)
## Total Rows written: 34800, Total time: 0.431
## Rows Read: 50000, Total Rows Processed: 50000, Total Chunk Time: 0.541 secondsTotal Rows written: 34935, Total time: 0.444
## Rows Read: 50000, Total Rows Processed: 100000, Total Chunk Time: 0.539 seconds
# Removing the selVar column
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("ALTER TABLE Train_Id DROP COLUMN selVar;", sep=""))
## [1] TRUE

Now we actually create the train and test tables in the server.

## Point to the training set. It will be created on the fly when training models. 
CM_AD_Train <- RxSqlServerData(  
  sqlQuery = "SELECT *   
              FROM CM_AD 
              WHERE Lead_Id IN (SELECT Lead_Id from Train_Id)",
  connectionString = connection_string, colInfo = column_info)

## Point to the testing set. It will be created on the fly when testing models. 
CM_AD_Test <- RxSqlServerData(  
  sqlQuery = "SELECT *   
              FROM CM_AD 
              WHERE Lead_Id NOT IN (SELECT Lead_Id from Train_Id)",
  connectionString = connection_string, colInfo = column_info)


print("Splitting completed")
## [1] "Splitting completed"

In the below step we are creating the formula whih will be used for the machine learning models. We will remove certain unique identifiers and redundant data from the model training.

# Specify the variables to keep for the training by writing the formula.

variables_all <- rxGetVarNames(CM_AD_Train)
variables_to_remove <- c("Lead_Id", "Phone_No", "Country", "Comm_Id", "Time_Stamp", "Category", "Launch_Date", "Focused_Geography",
                         "Call_For_Action", "Product", "Campaign_Name")
traning_variables <- variables_all[!(variables_all %in% c("Conversion_Flag", variables_to_remove))]
formula <- as.formula(paste("Conversion_Flag ~", paste(traning_variables, collapse = "+")))
formula
## Conversion_Flag ~ Age + Annual_Income_Bucket + Credit_Score + 
##     State + No_Of_Dependents + Highest_Education + Ethnicity + 
##     No_Of_Children + Household_Size + Gender + Marital_Status + 
##     Channel + Time_Of_Day + Campaign_Id + Day_Of_Week + Term + 
##     No_Of_People_Covered + Premium + Payment_Frequency + Amt_On_Maturity_Bin + 
##     Sub_Category + Campaign_Drivers + Tenure_Of_Campaign + Net_Amt_Insured + 
##     Product_Id + SMS_Count + Email_Count + Call_Count + Previous_Channel

Finally, we set the compute context to the SQL server as we want the training to take place in-database instead on the local client.

# Compute Context is set to SQL for model training.
rxSetComputeContext(sql)

A1. Random Forest Training

forest_model <- rxDForest(formula = formula,
                          data = CM_AD_Train,
                          nTree = 40,
                          minSplit = 10,
                          minBucket = 5,
                          cp = 0.00005,
                          seed = 5)

print("Training RF done")
## [1] "Training RF done"

A2. Save the Random Forest in SQL.

The compute context is set to Local in order to export the model

rxSetComputeContext(local)

saveRDS(forest_model, file = "forest_model.rds")

## Read Binary data from the .rds file
forest_model_raw <- readBin("forest_model.rds", "raw", n = file.size("forest_model.rds"))
forest_model_char <- as.character(forest_model_raw)

forest_model_sql <- RxSqlServerData(table = "forest_model_sql", connectionString = connection_string)
rxDataStep(inData = data.frame(x = forest_model_char ), outFile = forest_model_sql, overwrite = TRUE)
## Rows Read: 1926551, Total Rows Processed: 1926551
## Total Rows written: 100000, Total time: 1.346
## Total Rows written: 200000, Total time: 2.584
## Total Rows written: 300000, Total time: 3.788
## Total Rows written: 400000, Total time: 5.033
## Total Rows written: 500000, Total time: 6.259
## Total Rows written: 600000, Total time: 7.485
## Total Rows written: 700000, Total time: 8.678
## Total Rows written: 800000, Total time: 9.87
## Total Rows written: 900000, Total time: 11.065
## Total Rows written: 1000000, Total time: 12.259
## Total Rows written: 1100000, Total time: 13.454
## Total Rows written: 1200000, Total time: 14.649
## Total Rows written: 1300000, Total time: 15.841
## Total Rows written: 1400000, Total time: 17.059
## Total Rows written: 1500000, Total time: 18.258
## Total Rows written: 1600000, Total time: 19.454
## Total Rows written: 1700000, Total time: 20.644
## Total Rows written: 1800000, Total time: 21.835
## Total Rows written: 1900000, Total time: 23.022
## Total Rows written: 1926551, Total time: 23.352
## , Total Chunk Time: 23.471 seconds
# Set back the compute context to SQL to run another model.

rxSetComputeContext(sql)

B1. Gradient Boosted Trees Training and saving the model to SQL

# Train the GBT.

btree_model <- rxBTrees(formula = formula,
                        data = CM_AD_Train,
                        learningRate = 0.05,
                        minSplit = 10,
                        minBucket = 5,
                        cp = 0.0005,
                        nTree = 40,
                        seed = 5,
                        lossFunction = "multinomial")
## Warning in rxDeleteGuidDirOnHeadNodeHelper(GuidDirOnHeadNodeName): Failed
## to delete some files in C:\Users\Aditya\AppData\Local\Temp/RevoJobs/
## B4A22E5D731A45FEB65FC147F57C261C\
## Warning in rxDeleteGuidDirOnHeadNodeHelper(GuidDirOnHeadNodeName):
## Unable to delete temporary data directory: C:\Users\Aditya\AppData\Local
## \Temp/RevoJobs/B4A22E5D731A45FEB65FC147F57C261C\ Contact your system
## administrator.
# Save the GBT in SQL. The Compute Context is set to Local in order to export the model. 

rxSetComputeContext(local)
saveRDS(btree_model, file = "btree_model.rds")
btree_model_raw <- readBin("btree_model.rds", "raw", n = file.size("btree_model.rds"))
btree_model_char <- as.character(btree_model_raw)
btree_model_sql <- RxSqlServerData(table = "btree_model_sql", connectionString = connection_string) 
rxDataStep(inData = data.frame(x = btree_model_char ), outFile = btree_model_sql, overwrite = TRUE)
## Rows Read: 539360, Total Rows Processed: 539360
## Total Rows written: 100000, Total time: 1.189
## Total Rows written: 200000, Total time: 2.406
## Total Rows written: 300000, Total time: 3.596
## Total Rows written: 400000, Total time: 4.795
## Total Rows written: 500000, Total time: 6.078
## Total Rows written: 539360, Total time: 6.599
## , Total Chunk Time: 6.644 seconds

Binary classification model evaluation metrics

Write a function that computes the AUC, Accuracy, Precision, Recall, and F-Score.

evaluate_model <- function(observed, predicted_probability, threshold, model_name) { 

  # Given the observed labels and the predicted probability, plot the ROC curve and determine the AUC.

  data <- data.frame(observed, predicted_probability)
  data$observed <- as.numeric(as.character(data$observed))
  if(model_name =="RF"){
    rxRocCurve(actualVarName = "observed", 
               predVarNames = "predicted_probability",
               data = data, numBreaks = 1000, title = "RF" )
  }
  else{
    rxRocCurve(actualVarName = "observed", predVarNames = "predicted_probability", data = data, numBreaks = 1000, title = "GBT")
}

  ROC <- rxRoc(actualVarName = "observed", predVarNames = "predicted_probability", data = data, numBreaks = 1000)
  auc <- rxAuc(ROC)

  # Given the predicted probability and the threshold, determine the binary prediction.

  predicted <- ifelse(predicted_probability > threshold, 1, 0) 
  predicted <- factor(predicted, levels = c(0, 1)) 

  # Build the corresponding Confusion Matrix, then compute the Accuracy, Precision, Recall, and F-Score.

  confusion <- table(observed, predicted)
  print(model_name)
  print(confusion) 
  tp <- confusion[1, 1] 
  fn <- confusion[1, 2] 
  fp <- confusion[2, 1] 
  tn <- confusion[2, 2] 
  accuracy <- (tp + tn) / (tp + fn + fp + tn) 
  precision <- tp / (tp + fp) 
  recall <- tp / (tp + fn) 
  fscore <- 2 * (precision * recall) / (precision + recall) 

  # Return the computed metrics.
  metrics <- list("Accuracy" = accuracy, 
                  "Precision" = precision, 
                  "Recall" = recall, 
                  "F-Score" = fscore,
                  "AUC" = auc) 
  return(metrics) 
} 

Random Forest Scoring

# Make Predictions, then import them into R. The observed Conversion_Flag is kept through the argument extraVarsToWrite.

Prediction_Table_RF <- RxSqlServerData(table = "Forest_Prediction", stringsAsFactors = T, connectionString = connection_string)


rxPredict(forest_model, data = CM_AD_Test, outData = Prediction_Table_RF, overwrite = T, type = "prob",
          extraVarsToWrite = c("Conversion_Flag"))
## Rows Read: 30265, Total Rows Processed: 30265, Total Chunk Time: 0.832 seconds
## Total Rows written: 30265, Total time: 0.387
## 
Prediction_RF <- rxImport(inData = Prediction_Table_RF, stringsAsFactors = T, outFile = NULL)
## Rows Read: 30265, Total Rows Processed: 30265, Total Chunk Time: 0.096 seconds
observed <- Prediction_RF$Conversion_Flag

# Assign the decision threshold to the median of the predicted probabilities.
threshold <- median(Prediction_RF$`1_prob`)

# Compute the performance metrics of the model.
Metrics_RF <- evaluate_model(observed = observed, predicted_probability = Prediction_RF$`1_prob`, threshold = threshold,
                             model_name = "RF")

## [1] "RF"
##         predicted
## observed     0     1
##        0 14976 12314
##        1   157  2818

Gradient Boosted Trees Scoring

# Make Predictions, then import them into R. The observed Conversion_Flag is kept through the argument extraVarsToWrite.

Prediction_Table_GBT <- RxSqlServerData(table = "Boosted_Prediction", stringsAsFactors = T, connectionString = connection_string)
rxPredict(btree_model,data = CM_AD_Test, outData = Prediction_Table_GBT, overwrite = T, type="prob",
          extraVarsToWrite = c("Conversion_Flag"))
## Rows Read: 30265, Total Rows Processed: 30265, Total Chunk Time: 0.769 seconds
## Total Rows written: 30265, Total time: 0.388
## 
Prediction_GBT <- rxImport(inData = Prediction_Table_GBT, stringsAsFactors = T, outFile = NULL)
## Rows Read: 30265, Total Rows Processed: 30265, Total Chunk Time: 0.087 seconds
observed <- Prediction_GBT$Conversion_Flag

# Assign the decision threshold to the median of the predicted probabilities.
threshold <- median(Prediction_GBT$`1_prob`)

# Compute the performance metrics of the model.
Metrics_GBT <- evaluate_model(observed = observed, predicted_probability = Prediction_GBT$`1_prob`, threshold = threshold,
                              model_name = "GBT")

## [1] "GBT"
##         predicted
## observed     0     1
##        0 14691 12599
##        1   443  2532
# Select the best model based on AUC
if(Metrics_RF$AUC >= Metrics_GBT$AUC){
    best <- "RF"
    best_model <- forest_model
} else {
    best <- "GBT"
    best_model <- btree_model
}

print(best)
## [1] "RF"

Step 4: Channel-Day-Time Recommendations

We determine recommendations for each Lead_Id: best combination of Day, Channel, and Time to target them. The best combination is the one with the highest probability of conversion given by the selected model. This will help the business acheive a higher conversion rate.

In this step we:

  1. Create a full data table with all the unique combinations of Day_Of_Week, Channel, Time_Of_Day.

  2. Get the predicted probabilities for each Lead_Id, for all combinations of Day_Of_Week, Channel, Time_Of_Day, with best_model.

  3. For each Lead_Id, choose the combination that has the highest conversion probability.

Input : Data set CM_AD and the best prediction model, best_model. Output: Recommended Day_Of_Week, Channel and Time_Of_Day for each Lead_Id.

# Create a full data table with all the unique combinations of Day_of_Week, Channel, Time_Of_Day 

## Create a table with all the unique combinations of Day_of_Week, Channel, Time_Of_Day.
Day_of_Week_unique <- data.frame(seq(1, 7))
Channel_unique <- data.frame(c("Email", "Cold Calling", "SMS"))
Time_Of_Day_unique <- data.frame(c("Morning", "Afternoon", "Evening"))
Unique_Combos <- merge(merge(Day_of_Week_unique, Channel_unique), Time_Of_Day_unique)
colnames(Unique_Combos) <- c("Day_Of_Week", "Channel", "Time_Of_Day")

## Export it to SQL
Unique_Combos_sql <- RxSqlServerData(table = "Unique_Combos", connectionString = connection_string)
rxDataStep(inData = Unique_Combos, outFile = Unique_Combos_sql, overwrite = T)
## Rows Read: 63, Total Rows Processed: 63
## Total Rows written: 63, Total time: 0.002
## , Total Chunk Time: 0.019 seconds
## We create a table that has, for each Lead_Id and its corresponding variables (except Day_of_Week, Channel, Time_Of_Day),
## One row for each possible combination of Day_of_Week, Channel and Time_Of_Day.
## This is a pointer. The table will be created on the fly while scoring. 
## For a faster computation, we are considering only the top 10000 customers. 
## For the full solution, you can remove TOP(10000) in the query below. 

AD_full_merged_sql <- RxSqlServerData(
  sqlQuery = "SELECT * 
              FROM (
                    SELECT TOP(10000) Lead_Id, Age, Annual_Income_Bucket, Credit_Score, State, No_Of_Dependents, Highest_Education, Ethnicity,
                    No_Of_Children, Household_Size, Gender, Marital_Status, Campaign_Id, Product_Id, Term,
                    No_Of_People_Covered, Premium, Payment_Frequency, Amt_On_Maturity_Bin, Sub_Category, Campaign_Drivers,
                    Tenure_Of_Campaign, Net_Amt_Insured, SMS_Count, Email_Count,  Call_Count, 
                    Previous_Channel, Conversion_Flag
                    FROM CM_AD) a,
                    (SELECT * FROM Unique_Combos) b", 
  stringsAsFactors = T, connectionString = connection_string, colInfo = column_info)

print("Full table done")
## [1] "Full table done"
# Compute the predicted probabilities for each Lead_Id, for each combination of Day, Channel, Time, using best_model

## Score the full data by using the best model.
Prob_Id <- RxSqlServerData(table = "Prob_Id ", stringsAsFactors = T, connectionString = connection_string)
rxPredict(best_model, data = AD_full_merged_sql, outData = Prob_Id, overwrite = T, type = "prob",
          extraVarsToWrite = c("Lead_Id", "Day_Of_Week", "Time_Of_Day", "Channel"), reportProgress = 0)
## Total Rows written: 50000, Total time: 0.723
## Total Rows written: 50000, Total time: 0.748
## Total Rows written: 50000, Total time: 0.726
## Total Rows written: 50000, Total time: 0.722
## Total Rows written: 50000, Total time: 0.721
## Total Rows written: 50000, Total time: 0.727
## Total Rows written: 50000, Total time: 0.835
## Total Rows written: 50000, Total time: 0.721
## Total Rows written: 50000, Total time: 0.72
## Total Rows written: 50000, Total time: 0.724
## Total Rows written: 50000, Total time: 0.735
## Total Rows written: 50000, Total time: 0.72
## Total Rows written: 30000, Total time: 0.432
print("Scoring done")
## [1] "Scoring done"
# For each Lead_Id, choose a combination of Day, Channel, and Time that has the highest conversion probability  

rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Recommended_Combinations;"
, sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste(
"SELECT Lead_Id, Day_Of_Week, Channel, Time_Of_Day, MaxProb 
INTO Recommended_Combinations
FROM 
(
SELECT Lead_Id, Day_Of_Week, Channel, Time_Of_Day, [1_prob],
        MAX([1_prob]) OVER(PARTITION BY Lead_Id) AS MaxProb
FROM Prob_Id) AS T
WHERE T.[1_prob] = T.MaxProb"
, sep=""))
## [1] TRUE
print("Best combination per customer computed")
## [1] "Best combination per customer computed"

Add demographics information to the recommendation table

rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Recommendations;"
, sep=""))
## [1] TRUE
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("
SELECT Age, Annual_Income_Bucket, Credit_Score, Product, Campaign_Name, State,  
       CAST(Conversion_Flag AS int) AS Conversion_Flag, CM_AD.Day_Of_Week, CM_AD.Time_Of_Day,
       CM_AD.Channel, CM_AD.Lead_Id, Recommended_Combinations.Day_Of_Week as [Recommended_Day],
       Recommended_Combinations.Time_Of_Day as [Recommended_Time], Recommended_Combinations.MaxProb,
       Recommended_Combinations.Channel as [Recommended_Channel]
INTO Recommendations
FROM CM_AD JOIN Recommended_Combinations
ON CM_AD.Lead_Id = Recommended_Combinations.Lead_Id;"
, sep=""))
## [1] TRUE
print("Demographics information added")
## [1] "Demographics information added"
# Drop intermediate table.
rxExecuteSQLDDL(outOdbcDS, sSQLString = paste("DROP TABLE if exists Recommended_Combinations;"
, sep=""))
## [1] TRUE

Draw bar plots to visualize correlations

Recommendations <- RxSqlServerData(table = "Recommendations", stringsAsFactors = T, connectionString = connection_string)
Rec <- rxImport(Recommendations)
## Rows Read: 10068, Total Rows Processed: 10068, Total Chunk Time: 0.174 seconds
## Recommended Time
pt <- table(Rec$`Recommended_Time`, Rec$Age)

library(reshape)

datat <- melt(pt)
colnames(datat) <- c("Recommended_Time", "Age", "Percentage")

library(ggplot2)
library(scales)

ggplot(datat, aes(x = Age ,y = Percentage, fill =`Recommended_Time`)) + 
    geom_bar(position = "fill",stat = "identity") + 
    scale_y_continuous(labels = percent_format()) + 
    ggtitle("Recommended Time")

pc <- table(Rec$`Recommended_Channel`, Rec$Age)

datac <- melt(pc)
colnames(datac) <- c("Recommended_Channel", "Age", "Percentage")

ggplot(datac, aes(x = Age ,y = Percentage, fill =`Recommended_Channel`)) + 
    geom_bar(position = "fill",stat = "identity") + 
    scale_y_continuous(labels = percent_format()) + 
    ggtitle("Recommended Channel")

## Recommended Day 
pd <- table(Rec$`Recommended_Day`, Rec$Annual_Income_Bucket)

datad <- melt(pd)
colnames(datad) <- c("Recommended_Day", "Annual_Income_Bucket", "Percentage")

ggplot(datad, aes(x = Annual_Income_Bucket ,y = Percentage, fill =`Recommended_Day`)) + 
    geom_bar(position = "fill",stat = "identity") + 
    scale_y_continuous(labels = percent_format()) + 
    ggtitle("Recommended Day")

Conclusion

This experiment is a walk through of Microsoft Campaign Optimization template. All the code is inspired and modified for the purpose of learning and experimentation. There are multiple ways we can try to better the work flow especially related to model optimization. I will try to do that as running change in this post.

26-Feb, 2017