Building a Regression Model with Spark - Machine Learning with Spark (2015)

Machine Learning with Spark (2015)

Chapter 6. Building a Regression Model with Spark

In this chapter, we will build on what we covered in Chapter 5, Building a Classification Model with Spark. While classification models deal with outcomes that represent discrete classes, regression models are concerned with target variables that can take any real value. The underlying principle is very similar—we wish to find a model that maps input features to predicted target variables. Like classification, regression is also a form of supervised learning.

Regression models can be used to predict just about any variable of interest. A few examples include the following:

· Predicting stock returns and other economic variables

· Predicting loss amounts for loan defaults (this can be combined with a classification model that predicts the probability of default, while the regression model predicts the amount in the case of a default)

· Recommendations (the Alternating Least Squares factorization model from Chapter 4, Building a Recommendation Engine with Spark, uses linear regression in each iteration)

· Predicting customer lifetime value (CLTV) in a retail, mobile, or other business, based on user behavior and spending patterns

In the following sections, we will:

· Introduce the various types of regression models available in MLlib

· Explore feature extraction and target variable transformation for regression models

· Train a number of regression models using MLlib

· See how to make predictions using the trained models

· Investigate the impact on performance of various parameter settings for regression using cross-validation

Types of regression models

Spark's MLlib library offers two broad classes of regression models: linear models and decision tree regression models.

Linear models are essentially the same as their classification counterparts, the only difference is that linear regression models use a different loss function, related link function, and decision function. MLlib provides a standard least squares regression model (although other types of generalized linear models for regression are planned).

Decision trees can also be used for regression by changing the impurity measure.

Least squares regression

You might recall from Chapter 5, Building a Classification Model with Spark, that there are a variety of loss functions that can be applied to generalized linear models. The loss function used for least squares is the squared loss, which is defined as follows:

½ (wTx - y)2

Here, as for the classification setting, y is the target variable (this time, real valued), w is the weight vector, and x is the feature vector.

The related link function is the identity link, and the decision function is also the identity function, as generally, no thresholding is applied in regression. So, the model's prediction is simply y = wTx.

The standard least squares regression in MLlib does not use regularization. Looking at the squared loss function, we can see that the loss applied to incorrectly predicted points will be magnified since the loss is squared. This means that least squares regression is susceptible to outliers in the dataset and also to over-fitting. Generally, as for classification, we should apply some level of regularization in practice.

Linear regression with L2 regularization is commonly referred to as ridge regression, while applying L1 regularization is called the lasso.

Tip

See the section on linear least squares in the Spark MLlib documentation at http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression for further information.

Decision trees for regression

Just like using linear models for regression tasks involves changing the loss function used, using decision trees for regression involves changing the measure of the node impurity used. The impurity metric is called variance and is defined in the same way as the squared loss for least squares linear regression.

Note

See the MLlib - Decision Tree section in the Spark documentation at http://spark.apache.org/docs/latest/mllib-decision-tree.html for further details on the decision tree algorithm and impurity measure for regression.

Now, we will plot a simple example of a regression problem with only one input variable shown on the x axis and the target variable on the y axis. The linear model prediction function is shown by a red dashed line, while the decision tree prediction function is shown by a green dashed line. We can see that the decision tree allows a more complex, nonlinear model to be fitted to the data.

Decision trees for regression

Linear model and decision tree prediction functions for regression

Extracting the right features from your data

As the underlying models for regression are the same as those for the classification case, we can use the same approach to create input features. The only practical difference is that the target is now a real-valued variable, as opposed to a categorical one. TheLabeledPoint class in MLlib already takes this into account, as the label field is of the Double type, so it can handle both cases.

Extracting features from the bike sharing dataset

To illustrate the concepts in this chapter, we will be using the bike sharing dataset. This dataset contains hourly records of the number of bicycle rentals in the capital bike sharing system. It also contains variables related to date and time, weather, and seasonal and holiday information.

Note

The dataset is available at http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset.

Click on the Data Folder link and then download the Bike-Sharing-Dataset.zip file.

The bike sharing data was enriched with weather and seasonal data by Hadi Fanaee-T at the University of Porto and used in the following paper:

Fanaee-T, Hadi and Gama Joao, Event labeling combining ensemble detectors and background knowledge, Progress in Artificial Intelligence, pp. 1-15, Springer Berlin Heidelberg, 2013.

The paper is available at http://link.springer.com/article/10.1007%2Fs13748-013-0040-3.

Once you have downloaded the Bike-Sharing-Dataset.zip file, unzip it. This will create a directory called Bike-Sharing-Dataset, which contains the day.csv, hour.csv, and the Readme.txt files.

The Readme.txt file contains information on the dataset, including the variable names and descriptions. Take a look at the file, and you will see that we have the following variables available:

· instant: This is the record ID

· dteday: This is the raw date

· season: This is different seasons such as spring, summer, winter, and fall

· yr: This is the year (2011 or 2012)

· mnth: This is the month of the year

· hr: This is the hour of the day

· holiday: This is whether the day was a holiday or not

· weekday: This is the day of the week

· workingday: This is whether the day was a working day or not

· weathersit: This is a categorical variable that describes the weather at a particular time

· temp: This is the normalized temperature

· atemp: This is the normalized apparent temperature

· hum: This is the normalized humidity

· windspeed: This is the normalized wind speed

· cnt: This is the target variable, that is, the count of bike rentals for that hour

We will work with the hourly data contained in hour.csv. If you look at the first line of the dataset, you will see that it contains the column names as a header. You can do this by running the following command:

>head -1 hour.csv

This should output the following result:

instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt

Before we work with the data in Spark, we will again remove the header from the first line of the file using the same sed command that we used previously to create a new file called hour_noheader.csv:

>sed 1d hour.csv > hour_noheader.csv

Since we will be doing some plotting of our dataset later on, we will use the Python shell for this chapter. This also serves to illustrate how to use MLlib's linear model and decision tree functionality from PySpark.

Start up your PySpark shell from your Spark installation directory. If you want to use IPython, which we highly recommend, remember to include the IPYTHON=1 environment variable together with the pylab functionality:

>IPYTHON=1 IPYTHON_OPTS="—pylab" ./bin/pyspark

If you prefer to use IPython Notebook, you can start it with the following command:

>IPYTHON=1 IPYTHON_OPTS=notebook ./bin/pyspark

You can type all the code that follows for the remainder of this chapter directly into your PySpark shell (or into IPython Notebook if you wish to use it).

Tip

Recall that we used the IPython shell in Chapter 3, Obtaining, Processing, and Preparing Data with Spark. Take a look at that chapter and the code bundle for instructions to install IPython.

We'll start as usual by loading the dataset and inspecting it:

path = "/PATH/hour_noheader.csv"

raw_data = sc.textFile(path)

num_data = raw_data.count()

records = raw_data.map(lambda x: x.split(","))

first = records.first()

print first

print num_data

You should see the following output:

[u'1', u'2011-01-01', u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']

17379

So, we have 17,379 hourly records in our dataset. We have inspected the column names already. We will ignore the record ID and raw date columns. We will also ignore the casual and registered count target variables and focus on the overall count variable, cnt(which is the sum of the other two counts). We are left with 12 variables. The first eight are categorical, while the last 4 are normalized real-valued variables.

To deal with the eight categorical variables, we will use the binary encoding approach with which you should be quite familiar by now. The four real-valued variables will be left as is.

We will first cache our dataset, since we will be reading from it many times:

records.cache()

In order to extract each categorical feature into a binary vector form, we will need to know the feature mapping of each feature value to the index of the nonzero value in our binary vector. Let's define a function that will extract this mapping from our dataset for a given column:

def get_mapping(rdd, idx):

return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

Our function first maps the field to its unique values and then uses the zipWithIndex transformation to zip the value up with a unique index such that a key-value RDD is formed, where the key is the variable and the value is the index. This index will be the index of the nonzero entry in the binary vector representation of the feature. We will finally collect this RDD back to the driver as a Python dictionary.

We can test our function on the third variable column (index 2):

print "Mapping of first categorical feasture column: %s" % get_mapping(records, 2)

The preceding line of code will give us the following output:

Mapping of first categorical feasture column: {u'1': 0, u'3': 2, u'2': 1, u'4': 3}

Now, we can apply this function to each categorical column (that is, for variable indices 2 to 9):

mappings = [get_mapping(records, i) for i in range(2,10)]

cat_len = sum(map(len, mappings))

num_len = len(records.first()[11:15])

total_len = num_len + cat_len

We now have the mappings for each variable, and we can see how many values in total we need for our binary vector representation:

print "Feature vector length for categorical features: %d" % cat_len

print "Feature vector length for numerical features: %d" % num_len

print "Total feature vector length: %d" % total_len

The output of the preceding code is as follows:

Feature vector length for categorical features: 57

Feature vector length for numerical features: 4

Total feature vector length: 61

Creating feature vectors for the linear model

The next step is to use our extracted mappings to convert the categorical features to binary-encoded features. Again, it will be helpful to create a function that we can apply to each record in our dataset for this purpose. We will also create a function to extract the target variable from each record. We will need to import numpy for linear algebra utilities and MLlib's LabeledPoint class to wrap our feature vectors and target variables:

from pyspark.mllib.regression import LabeledPoint

import numpy as np

def extract_features(record):

cat_vec = np.zeros(cat_len)

i = 0

step = 0

for field in record[2:9]:

m = mappings[i]

idx = m[field]

cat_vec[idx + step] = 1

i = i + 1

step = step + len(m)

num_vec = np.array([float(field) for field in record[10:14]])

return np.concatenate((cat_vec, num_vec))

def extract_label(record):

return float(record[-1])

In the preceding extract_features function, we ran through each column in the row of data. We extracted the binary encoding for each variable in turn from the mappings we created previously. The step variable ensures that the nonzero feature index in the full feature vector is correct (and is somewhat more efficient than, say, creating many smaller binary vectors and concatenating them). The numeric vector is created directly by first converting the data to floating point numbers and wrapping these in a numpy array. The resulting two vectors are then concatenated. The extract_label function simply converts the last column variable (the count) into a float.

With our utility functions defined, we can proceed with extracting feature vectors and labels from our data records:

data = records.map(lambda r: LabeledPoint(extract_label(r), extract_features(r)))

Let's inspect the first record in the extracted feature RDD:

first_point = data.first()

print "Raw data: " + str(first[2:])

print "Label: " + str(first_point.label)

print "Linear Model feature vector:\n" + str(first_point.features)

print "Linear Model feature vector length: " + str(len(first_point.features))

You should see output similar to the following:

Raw data: [u'1', u'0', u'1', u'0', u'0', u'6', u'0', u'1', u'0.24', u'0.2879', u'0.81', u'0', u'3', u'13', u'16']

Label: 16.0

Linear Model feature vector: [1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.24,0.2879,0.81,0.0]

Linear Model feature vector length: 61

As we can see, we converted the raw data into a feature vector made up of the binary categorical and real numeric features, and we indeed have a total vector length of 61.

Creating feature vectors for the decision tree

As we have seen, decision tree models typically work on raw features (that is, it is not required to convert categorical features into a binary vector encoding; they can, instead, be used directly). Therefore, we will create a separate function to extract the decision tree feature vector, which simply converts all the values to floats and wraps them in a numpy array:

def extract_features_dt(record):

return np.array(map(float, record[2:14]))

data_dt = records.map(lambda r: LabeledPoint(extract_label(r), extract_features_dt(r)))

first_point_dt = data_dt.first()

print "Decision Tree feature vector: " + str(first_point_dt.features)

print "Decision Tree feature vector length: " + str(len(first_point_dt.features))

The following output shows the extracted feature vector, and we can see that we have a vector length of 12, which matches the number of raw variables we are using:

Decision Tree feature vector: [1.0,0.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0]

Decision Tree feature vector length: 12

Training and using regression models

Training for regression models using decision trees and linear models follows the same procedure as for classification models. We simply pass the training data contained in a [LabeledPoint] RDD to the relevant train method. Note that in Scala, if we wanted to customize the various model parameters (such as regularization and step size for the SGD optimizer), we are required to instantiate a new model instance and use the optimizer field to access these available parameter setters.

In Python, we are provided with a convenience method that gives us access to all the available model arguments, so we only have to use this one entry point for training. We can see the details of these convenience functions by importing the relevant modules and then calling the help function on the train methods:

from pyspark.mllib.regression import LinearRegressionWithSGD

from pyspark.mllib.tree import DecisionTree

help(LinearRegressionWithSGD.train)

Doing this for the linear model outputs the following documentation:

Training and using regression models

Linear regression help documentation

We can see from the linear regression documentation that we need to pass in the training data at a minimum, but we can set any of the other model parameters using this train method.

Similarly, for the decision tree model, which has a trainRegressor method (in addition to a trainClassifier method for classification models):

help(DecisionTree.trainRegressor)

The preceding code would display the following documentation:

Training and using regression models

Decision tree regression help documentation

Training a regression model on the bike sharing dataset

We're ready to use the features we have extracted to train our models on the bike sharing data. First, we'll train the linear regression model and take a look at the first few predictions that the model makes on the data:

linear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept=False)

true_vs_predicted = data.map(lambda p: (p.label, linear_model.predict(p.features)))

print "Linear Model predictions: " + str(true_vs_predicted.take(5))

Note that we have not used the default settings for iterations and step here. We've changed the number of iterations so that the model does not take too long to train. As for the step size, you will see why this has been changed from the default a little later. You will see the following output:

Linear Model predictions: [(16.0, 119.30920003093595), (40.0, 116.95463511937379), (32.0, 116.57294610647752), (13.0, 116.43535423855654), (1.0, 116.221247828503)]

Next, we will train the decision tree model simply using the default arguments to the trainRegressor method (which equates to using a tree depth of 5). Note that we need to pass in the other form of the dataset, data_dt, that we created from the raw feature values (as opposed to the binary encoded features that we used for the preceding linear model).

We also need to pass in an argument for categoricalFeaturesInfo. This is a dictionary that maps the categorical feature index to the number of categories for the feature. If a feature is not in this mapping, it will be treated as continuous. For our purposes, we will leave this as is, passing in an empty mapping:

dt_model = DecisionTree.trainRegressor(data_dt,{})

preds = dt_model.predict(data_dt.map(lambda p: p.features))

actual = data.map(lambda p: p.label)

true_vs_predicted_dt = actual.zip(preds)

print "Decision Tree predictions: " + str(true_vs_predicted_dt.take(5))

print "Decision Tree depth: " + str(dt_model.depth())

print "Decision Tree number of nodes: " + str(dt_model.numNodes())

This should output these predictions:

Decision Tree predictions: [(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945), (13.0, 14.284023668639053), (1.0, 14.284023668639053)]

Decision Tree depth: 5

Decision Tree number of nodes: 63

Note

This is not as bad as it sounds. While we do not cover it here, the Python code included with this chapter's code bundle includes an example of using categoricalFeaturesInfo. It does not make a large difference to performance in this case.

From a quick glance at these predictions, it appears that the decision tree might do better, as the linear model is quite a way off in its predictions. However, we will apply more stringent evaluation methods to find out.

Evaluating the performance of regression models

We saw in Chapter 5, Building a Classification Model with Spark, that evaluation methods for classification models typically focus on measurements related to predicted class memberships relative to the actual class memberships. These are binary outcomes (either the predicted class is correct or incorrect), and it is less important whether the model just barely predicted correctly or not; what we care most about is the number of correct and incorrect predictions.

When dealing with regression models, it is very unlikely that our model will precisely predict the target variable, because the target variable can take on any real value. However, we would naturally like to understand how far away our predicted values are from the true values, so will we utilize a metric that takes into account the overall deviation.

Some of the standard evaluation metrics used to measure the performance of regression models include the Mean Squared Error (MSE) and Root Mean Squared Error (RMSE), the Mean Absolute Error (MAE), the R-squared coefficient, and many others.

Mean Squared Error and Root Mean Squared Error

MSE is the average of the squared error that is used as the loss function for least squares regression:

Mean Squared Error and Root Mean Squared Error

It is the sum, over all the data points, of the square of the difference between the predicted and actual target variables, divided by the number of data points.

RMSE is the square root of MSE. MSE is measured in units that are the square of the target variable, while RMSE is measured in the same units as the target variable. Due to its formulation, MSE, just like the squared loss function that it derives from, effectively penalizes larger errors more severely.

In order to evaluate our predictions based on the mean of an error metric, we will first make predictions for each input feature vector in an RDD of LabeledPoint instances by computing the error for each record using a function that takes the prediction and true target value as inputs. This will return a [Double] RDD that contains the error values. We can then find the average using the mean method of RDDs that contain Double values.

Let's define our squared error function as follows:

def squared_error(actual, pred):

return (pred - actual)**2

Mean Absolute Error

MAE is the average of the absolute differences between the predicted and actual targets:

Mean Absolute Error

MAE is similar in principle to MSE, but it does not punish large deviations as much.

Our function to compute MAE is as follows:

def abs_error(actual, pred):

return np.abs(pred - actual)

Root Mean Squared Log Error

This measurement is not as widely used as MSE and MAE, but it is used as the metric for the Kaggle competition that uses the bike sharing dataset. It is effectively the RMSE of the log-transformed predicted and target values. This measurement is useful when there is a wide range in the target variable, and you do not necessarily want to penalize large errors when the predicted and target values are themselves high. It is also effective when you care about percentage errors rather than the absolute value of errors.

Note

The Kaggle competition evaluation page can be found at https://www.kaggle.com/c/bike-sharing-demand/details/evaluation.

The function to compute RMSLE is shown here:

def squared_log_error(pred, actual):

return (np.log(pred + 1) - np.log(actual + 1))**2

The R-squared coefficient

The R-squared coefficient, also known as the coefficient of determination, is a measure of how well a model fits a dataset. It is commonly used in statistics. It measures the degree of variation in the target variable; this is explained by the variation in the input features. An R-squared coefficient generally takes a value between 0 and 1, where 1 equates to a perfect fit of the model.

Computing performance metrics on the bike sharing dataset

Given the functions we defined earlier, we can now compute the various evaluation metrics on our bike sharing data.

Linear model

Our approach will be to apply the relevant error function to each record in the RDD we computed earlier, which is true_vs_predicted for our linear model:

mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()

mae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()

rmsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())

print "Linear Model - Mean Squared Error: %2.4f" % mse

print "Linear Model - Mean Absolute Error: %2.4f" % mae

print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle

This outputs the following metrics:

Linear Model - Mean Squared Error: 28166.3824

Linear Model - Mean Absolute Error: 129.4506

Linear Model - Root Mean Squared Log Error: 1.4974

Decision tree

We will use the same approach for the decision tree model, using the true_vs_predicted_dt RDD:

mse_dt = true_vs_predicted_dt.map(lambda (t, p): squared_error(t, p)).mean()

mae_dt = true_vs_predicted_dt.map(lambda (t, p): abs_error(t, p)).mean()

rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())

print "Decision Tree - Mean Squared Error: %2.4f" % mse_dt

print "Decision Tree - Mean Absolute Error: %2.4f" % mae_dt

print "Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_dt

You should see output similar to this:

Decision Tree - Mean Squared Error: 11560.7978

Decision Tree - Mean Absolute Error: 71.0969

Decision Tree - Root Mean Squared Log Error: 0.6259

Looking at the results, we can see that our initial guess about the decision tree model being the better performer is indeed true.

Note

The Kaggle competition leaderboard lists the Mean Value Benchmark score on the test set at about 1.58. So, we see that our linear model performance is not much better. However, the decision tree with default settings achieves a performance of 0.63.

The winning score at the time of writing this book is listed as 0.29504.

Improving model performance and tuning parameters

In Chapter 5, Building a Classification Model with Spark, we showed how feature transformation and selection can make a large difference to the performance of a model. In this chapter, we will focus on another type of transformation that can be applied to a dataset: transforming the target variable itself.

Transforming the target variable

Recall that many machine learning models, including linear models, make assumptions regarding the distribution of the input data as well as target variables. In particular, linear regression assumes a normal distribution.

In many real-world cases, the distributional assumptions of linear regression do not hold. In this case, for example, we know that the number of bike rentals can never be negative. This alone should indicate that the assumption of normality might be problematic. To get a better idea of the target distribution, it is often a good idea to plot a histogram of the target values.

In this section, if you are using IPython Notebook, enter the magic function, %pylab inline, to import pylab (that is, the numpy and matplotlib plotting functions) into the workspace. This will also create any figures and plots inline within the Notebook cell.

If you are using the standard IPython console, you can use %pylab to import the necessary functionality (your plots will appear in a separate window).

We will now create a plot of the target variable distribution in the following piece of code:

targets = records.map(lambda r: float(r[-1])).collect()

hist(targets, bins=40, color='lightblue', normed=True)

fig = matplotlib.pyplot.gcf()

fig.set_size_inches(16, 10)

Looking at the histogram plot, we can see that the distribution is highly skewed and certainly does not follow a normal distribution:

Transforming the target variable

Distribution of raw target variable values

One way in which we might deal with this situation is by applying a transformation to the target variable, such that we take the logarithm of the target value instead of the raw value. This is often referred to as log-transforming the target variable (this transformation can also be applied to feature values).

We will apply a log transformation to the following target variable and plot a histogram of the log-transformed values:

log_targets = records.map(lambda r: np.log(float(r[-1]))).collect()

hist(log_targets, bins=40, color='lightblue', normed=True)

fig = matplotlib.pyplot.gcf()

fig.set_size_inches(16, 10)

Transforming the target variable

Distribution of log-transformed target variable values

A second type of transformation that is useful in the case of target values that do not take on negative values and, in addition, might take on a very wide range of values, is to take the square root of the variable.

We will apply the square root transform in the following code, once more plotting the resulting target variable distribution:

sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect()

hist(sqrt_targets, bins=40, color='lightblue', normed=True)

fig = matplotlib.pyplot.gcf()

fig.set_size_inches(16, 10)

From the plots of the log and square root transformations, we can see that both result in a more even distribution relative to the raw values. While they are still not normally distributed, they are a lot closer to a normal distribution when compared to the original target variable.

Transforming the target variable

Distribution of square-root-transformed target variable values

Impact of training on log-transformed targets

So, does applying these transformations have any impact on model performance? Let's evaluate the various metrics we used previously on log-transformed data as an example.

We will do this first for the linear model by applying the numpy log function to the label field of each LabeledPoint RDD. Here, we will only transform the target variable, and we will not apply any transformations to the features:

data_log = data.map(lambda lp: LabeledPoint(np.log(lp.label), lp.features))

We will then train a model on this transformed data and form the RDD of predicted versus true values:

model_log = LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1)

Note that now that we have transformed the target variable, the predictions of the model will be on the log scale, as will the target values of the transformed dataset. Therefore, in order to use our model and evaluate its performance, we must first transform the log data back into the original scale by taking the exponent of both the predicted and true values using the numpy exp function. We will show you how to do this in the code here:

true_vs_predicted_log = data_log.map(lambda p: (np.exp(p.label), np.exp(model_log.predict(p.features))))

Finally, we will compute the MSE, MAE, and RMSLE metrics for the model:

mse_log = true_vs_predicted_log.map(lambda (t, p): squared_error(t, p)).mean()

mae_log = true_vs_predicted_log.map(lambda (t, p): abs_error(t, p)).mean()

rmsle_log = np.sqrt(true_vs_predicted_log.map(lambda (t, p): squared_log_error(t, p)).mean())

print "Mean Squared Error: %2.4f" % mse_log

print "Mean Absolue Error: %2.4f" % mae_log

print "Root Mean Squared Log Error: %2.4f" % rmsle_log

print "Non log-transformed predictions:\n" + str(true_vs_predicted.take(3))

print "Log-transformed predictions:\n" + str(true_vs_predicted_log.take(3))

You should see output similar to the following:

Mean Squared Error: 38606.0875

Mean Absolue Error: 135.2726

Root Mean Squared Log Error: 1.3516

Non log-transformed predictions:

[(16.0, 119.30920003093594), (40.0, 116.95463511937378), (32.0, 116.57294610647752)]

Log-transformed predictions:

[(15.999999999999998, 45.860944832110015), (40.0, 43.255903592233274), (32.0, 42.311306147884252)]

If we compare these results to the results on the raw target variable, we see that while we did not improve the MSE or MAE, we improved the RMSLE.

We will perform the same analysis for the decision tree model:

data_dt_log = data_dt.map(lambda lp: LabeledPoint(np.log(lp.label), lp.features))

dt_model_log = DecisionTree.trainRegressor(data_dt_log,{})

preds_log = dt_model_log.predict(data_dt_log.map(lambda p: p.features))

actual_log = data_dt_log.map(lambda p: p.label)

true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda (t, p): (np.exp(t), np.exp(p)))

mse_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): squared_error(t, p)).mean()

mae_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): abs_error(t, p)).mean()

rmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda (t, p): squared_log_error(t, p)).mean())

print "Mean Squared Error: %2.4f" % mse_log_dt

print "Mean Absolue Error: %2.4f" % mae_log_dt

print "Root Mean Squared Log Error: %2.4f" % rmsle_log_dt

print "Non log-transformed predictions:\n" + str(true_vs_predicted_dt.take(3))

print "Log-transformed predictions:\n" + str(true_vs_predicted_dt_log.take(3))

From the results here, we can see that we actually made our metrics slightly worse for the decision tree:

Mean Squared Error: 14781.5760

Mean Absolue Error: 76.4131

Root Mean Squared Log Error: 0.6406

Non log-transformed predictions:

[(16.0, 54.913223140495866), (40.0, 54.913223140495866), (32.0, 53.171052631578945)]

Log-transformed predictions:

[(15.999999999999998, 37.530779787154508), (40.0, 37.530779787154508), (32.0, 7.2797070993907287)]

Tip

It is probably not surprising that the log transformation results in a better RMSLE performance for the linear model. As we are minimizing the squared error, once we have transformed the target variable to log values, we are effectively minimizing a loss function that is very similar to the RMSLE.

This is good for Kaggle competition purposes, since we can more directly optimize against the competition-scoring metric.

It might or might not be as useful in a real-world situation. This depends on how important larger absolute errors are (recall that RMSLE essentially penalizes relative errors rather than absolute magnitude of errors).

Tuning model parameters

So far in this chapter, we have illustrated the concepts of model training and evaluation for MLlib's regression models by training and testing on the same dataset. We will now use a similar cross-validation approach that we used previously to evaluate the effect on performance of different parameter settings for our models.

Creating training and testing sets to evaluate parameters

The first step is to create a test and training set for cross-validation purposes. Spark's Python API does not yet provide the randomSplit convenience method that is available in Scala. Hence, we will need to create a training and test dataset manually.

One relatively easy way to do this is by first taking a random sample of, say, 20 percent of our data as our test set. We will then define our training set as the elements of the original RDD that are not in the test set RDD.

We can achieve this using the sample method to take a random sample for our test set, followed by using the subtractByKey method, which takes care of returning the elements in one RDD where the keys do not overlap with the other RDD.

Note that subtractByKey, as the name suggests, works on the keys of the RDD elements that consist of key-value pairs. Therefore, here we will use zipWithIndex on our RDD of extracted training examples. This creates an RDD of (LabeledPoint, index) pairs.

We will then reverse the keys and values so that we can operate on the index keys:

data_with_idx = data.zipWithIndex().map(lambda (k, v): (v, k))

test = data_with_idx.sample(False, 0.2, 42)

train = data_with_idx.subtractByKey(test)

Once we have the two RDDs, we will recover just the LabeledPoint instances we need for training and test data, using map to extract the value from the key-value pairs:

train_data = train.map(lambda (idx, p): p)

test_data = test.map(lambda (idx, p) : p)

train_size = train_data.count()

test_size = test_data.count()

print "Training data size: %d" % train_size

print "Test data size: %d" % test_size

print "Total data size: %d " % num_data

print "Train + Test size : %d" % (train_size + test_size)

We can confirm that we now have two distinct datasets that add up to the original dataset in total:

Training data size: 13934

Test data size: 3445

Total data size: 17379

Train + Test size : 17379

The final step is to apply the same approach to the features extracted for the decision tree model:

data_with_idx_dt = data_dt.zipWithIndex().map(lambda (k, v): (v, k))

test_dt = data_with_idx_dt.sample(False, 0.2, 42)

train_dt = data_with_idx_dt.subtractByKey(test_dt)

train_data_dt = train_dt.map(lambda (idx, p): p)

test_data_dt = test_dt.map(lambda (idx, p) : p)

The impact of parameter settings for linear models

Now that we have prepared our training and test sets, we are ready to investigate the impact of different parameter settings on model performance. We will first carry out this evaluation for the linear model. We will create a convenience function to evaluate the relevant performance metric by training the model on the training set and evaluating it on the test set for different parameter settings.

We will use the RMSLE evaluation metric, as it is the one used in the Kaggle competition with this dataset, and this allows us to compare our model results against the competition leaderboard to see how we perform.

The evaluation function is defined here:

def evaluate(train, test, iterations, step, regParam, regType, intercept):

model = LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam, regType=regType, intercept=intercept)

tp = test.map(lambda p: (p.label, model.predict(p.features)))

rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t, p)).mean())

return rmsle

Tip

Note that in the following sections, you might get slightly different results due to some random initialization for SGD. However, your results will be comparable.

Iterations

As we saw when evaluating our classification models, we generally expect that a model trained with SGD will achieve better performance as the number of iterations increases, although the increase in performance will slow down as the number of iterations goes above some minimum number. Note that here, we will set the step size to 0.01 to better illustrate the impact at higher iteration numbers:

params = [1, 5, 10, 20, 50, 100]

metrics = [evaluate(train_data, test_data, param, 0.01, 0.0, 'l2', False) for param in params]

print params

print metrics

The output shows that the error metric indeed decreases as the number of iterations increases. It also does so at a decreasing rate, again as expected. What is interesting is that eventually, the SGD optimization tends to overshoot the optimal solution, and the RMSLE eventually starts to increase slightly:

[1, 5, 10, 20, 50, 100]

[2.3532904530306888, 1.6438528499254723, 1.4869656275309227, 1.4149741941240344, 1.4159641262731959, 1.4539667094611679]

Here, we will use the matplotlib library to plot a graph of the RMSLE metric against the number of iterations. We will use a log scale for the x axis to make the output easier to visualize:

plot(params, metrics)

fig = matplotlib.pyplot.gcf()

pyplot.xscale('log')

Iterations

Metrics for varying number of iterations

Step size

We will perform a similar analysis for step size in the following code:

params = [0.01, 0.025, 0.05, 0.1, 1.0]

metrics = [evaluate(train_data, test_data, 10, param, 0.0, 'l2', False) for param in params]

print params

print metrics

The output of the preceding code:

[0.01, 0.025, 0.05, 0.1, 0.5]

[1.4869656275309227, 1.4189071944747715, 1.5027293911925559, 1.5384660954019973, nan]

Now, we can see why we avoided using the default step size when training the linear model originally. The default is set to 1.0, which, in this case, results in a nan output for the RMSLE metric. This typically means that the SGD model has converged to a very poor local minimum in the error function that it is optimizing. This can happen when the step size is relatively large, as it is easier for the optimization algorithm to overshoot good solutions.

We can also see that for low step sizes and a relatively low number of iterations (we used 10 here), the model performance is slightly poorer. However, in the preceding Iterations section, we saw that for the lower step-size setting, a higher number of iterations will generally converge to a better solution.

Generally speaking, setting step size and number of iterations involves a trade-off. A lower step size means that convergence is slower but slightly more assured. However, it requires a higher number of iterations, which is more costly in terms of computation and time, in particular at a very large scale.

Tip

Selecting the best parameter settings can be an intensive process that involves training a model on many combinations of parameter settings and selecting the best outcome. Each instance of model training involves a number of iterations, so this process can be very expensive and time consuming when performed on very large datasets.

The output is plotted here, again using a log scale for the step-size axis:

Step size

Metrics for varying values of step size

L2 regularization

In Chapter 5, Building a Classification Model with Spark, we saw that regularization has the effect of penalizing model complexity in the form of an additional loss term that is a function of the model weight vector. L2 regularization penalizes the L2-norm of the weight vector, while L1 regularization penalizes the L1-norm.

We expect training set performance to deteriorate with increasing regularization, as the model cannot fit the dataset well. However, we would also expect some amount of regularization that will result in optimal generalization performance as evidenced by the best performance on the test set.

We will evaluate the impact of different levels of L2 regularization in this code:

params = [0.0, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0]

metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l2', False) for param in params]

print params

print metrics

plot(params, metrics)

fig = matplotlib.pyplot.gcf()

pyplot.xscale('log')

As expected, there is an optimal setting of the regularization parameter with respect to the test set RMSLE:

[0.0, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0]

[1.5384660954019971, 1.5379108106882864, 1.5329809395123755, 1.4900275345312988, 1.4016676336981468, 1.40998359211149, 1.5381771283158705]

This is easiest to see in the following plot (where we once more use the log scale for the regularization parameter axis):

L2 regularization

Metrics for varying levels of L2 regularization

L1 regularization

We can apply the same approach for differing levels of L1 regularization:

params = [0.0, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]

metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l1', False) for param in params]

print params

print metrics

plot(params, metrics)

fig = matplotlib.pyplot.gcf()

pyplot.xscale('log')

Again, the results are more clearly seen when plotted in the following graph. We see that there is a much more subtle decline in RMSLE, and it takes a very high value to cause a jump back up. Here, the level of L1 regularization required is much higher than that for the L2 form; however, the overall performance is poorer:

[0.0, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]

[1.5384660954019971, 1.5384518080419873, 1.5383237472930684, 1.5372017600929164, 1.5303809928601677, 1.4352494587433793, 4.7551250073268614]

L1 regularization

Metrics for varying levels of L1 regularization

Using L1 regularization can encourage sparse weight vectors. Does this hold true in this case? We can find out by examining the number of entries in the weight vector that are zero, with increasing levels of regularization:

model_l1 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=1.0, regType='l1', intercept=False)

model_l1_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=10.0, regType='l1', intercept=False)

model_l1_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=100.0, regType='l1', intercept=False)

print "L1 (1.0) number of zero weights: " + str(sum(model_l1.weights.array == 0))

print "L1 (10.0) number of zeros weights: " + str(sum(model_l1_10.weights.array == 0))

print "L1 (100.0) number of zeros weights: " + str(sum(model_l1_100.weights.array == 0))

We can see from the results that as we might expect, the number of zero feature weights in the model weight vector increases as greater levels of L1 regularization are applied:

L1 (1.0) number of zero weights: 4

L1 (10.0) number of zeros weights: 20

L1 (100.0) number of zeros weights: 55

Intercept

The final parameter option for the linear model is whether to use an intercept or not. An intercept is a constant term that is added to the weight vector and effectively accounts for the mean value of the target variable. If the data is already centered or normalized, an intercept is not necessary; however, it often does not hurt to use one in any case.

We will evaluate the effect of adding an intercept term to the model here:

params = [False, True]

metrics = [evaluate(train_data, test_data, 10, 0.1, 1.0, 'l2', param) for param in params]

print params

print metrics

bar(params, metrics, color='lightblue')

fig = matplotlib.pyplot.gcf()

We can see from the result and plot that adding the intercept term results in a very slight increase in RMSLE:

[False, True]

[1.4900275345312988, 1.506469812020645]

Intercept

Metrics without and with an intercept

The impact of parameter settings for the decision tree

Decision trees provide two main parameters: maximum tree depth and the maximum number of bins. We will now perform the same evaluation of the effect of parameter settings for the decision tree model. Our starting point is to create an evaluation function for the model, similar to the one used for the linear regression earlier. This function is provided here:

def evaluate_dt(train, test, maxDepth, maxBins):

model = DecisionTree.trainRegressor(train, {}, impurity='variance', maxDepth=maxDepth, maxBins=maxBins)

preds = model.predict(test.map(lambda p: p.features))

actual = test.map(lambda p: p.label)

tp = actual.zip(preds)

rmsle = np.sqrt(tp.map(lambda (t, p): squared_log_error(t, p)).mean())

return rmsle

Tree depth

We would generally expect performance to increase with more complex trees (that is, trees of greater depth). Having a lower tree depth acts as a form of regularization, and it might be the case that as with L2 or L1 regularization in linear models, there is a tree depth that is optimal with respect to the test set performance.

Here, we will try to increase the depths of trees to see what impact they have on test set RMSLE, keeping the number of bins at the default level of 32:

params = [1, 2, 3, 4, 5, 10, 20]

metrics = [evaluate_dt(train_data_dt, test_data_dt, param, 32) for param in params]

print params

print metrics

plot(params, metrics)

fig = matplotlib.pyplot.gcf()

In this case, it appears that the decision tree starts over-fitting at deeper tree levels. An optimal tree depth appears to be around 10 on this dataset.

Note

Notice that our best RMSLE of 0.42 is now quite close to the Kaggle winner of around 0.29!

The output of the tree depth is as follows:

[1, 2, 3, 4, 5, 10, 20]

[1.0280339660196287, 0.92686672078778276, 0.81807794023407532, 0.74060228537329209, 0.63583503599563096, 0.42851360418692447, 0.45500008049779139]

Tree depth

Metrics for different tree depths

Maximum bins

Finally, we will perform our evaluation on the impact of setting the number of bins for the decision tree. As with the tree depth, a larger number of bins should allow the model to become more complex and might help performance with larger feature dimensions. After a certain point, it is unlikely that it will help any more and might, in fact, hinder performance on the test set due to over-fitting:

params = [2, 4, 8, 16, 32, 64, 100]

metrics = [evaluate_dt(train_data_dt, test_data_dt, 5, param) for param in params]

print params

print metrics

plot(params, metrics)

fig = matplotlib.pyplot.gcf()

Here, we will show the output and plot to vary the number of bins (while keeping the tree depth at the default level of 5). In this case, using a small number of bins hurts performance, while there is no impact when we use around 32 bins (the default setting) or more. There seems to be an optimal setting for test set performance at around 16-20 bins:

[2, 4, 8, 16, 32, 64, 100]

[1.3069788763726049, 0.81923394899750324, 0.75745322513058744, 0.62328384445223795, 0.63583503599563096, 0.63583503599563096, 0.63583503599563096]

Maximum bins

Metrics for different maximum bins

Summary

In this chapter, you saw how to use MLlib's linear model and decision tree functionality in Python within the context of regression models. We explored categorical feature extraction and the impact of applying transformations to the target variable in a regression problem. Finally, we implemented various performance-evaluation metrics and used them to implement a cross-validation exercise that explores the impact of the various parameter settings available in both linear models and decision trees on test set model performance.

In the next chapter, we will cover a different approach to machine learning, that is unsupervised learning, specifically in clustering models.