Using Neural Networks with Pytorch to Predict Fail of Automatic Recovery
This exercise is part of aproject implemented on a hardware system. The system has automatic doors that allow to be recovered when they fail to operate by the user (to cover the scenario of the mechanism getting stuck, for example). In some cases, this recovery procedure failed, indicating that something deeper might be occurring. At this point the user has to resort to a technician for support.
The original dataset was queried from AWS, in order to retrieve it, I devised the following query script (which is reusable):
import pandas as pd
import boto3 as aws
import os
import awswrangler as wr
import pyspark.pandas as ps
from itertools import chain, islice, repeat, tee
import numpy as np
class QueryAthena:
def __init__(self, query):
self.database = 'database'
self.folder = 'path_queries/'
self.bucket = 'bucket_name'
self.s3_output = 's3://' + self.bucket + '/' + self.folder
self.aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
self.aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
self.region_name = os.environ.get('AWS_DEFAULT_REGION')
self.aws_session_token = os.environ.get('AWS_SESSION_TOKEN')
self.query = query
def run_query(self):
boto3_session = aws.Session(aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.region_name)
df = wr.athena.read_sql_query(sql=self.query, database=self.database,ctas_approach=False, s3_output=self.s3_output)
return df
With this it is very easy to run a sql like (Athena uses Presto) query to retrieve data from the datalake. I won’t go into the details for this function since it is not the objective of the article
df = QueryAthena("""
select * from table
""").run_query()
df.describe()
As seen here, we have 94 columns in the original dataset, not all can be used as predictors, as some are metadata about the device, customer, timestamp, etc…
In the next step I exclude those columns that are unusable and named the target variable with the standard name “Y”
#name of the target variable
Y_ = "target_"
#name of metadata columns
dropped = ["meta_1","meta_2","meta_3","meta_4","meta_5"]
clean_df = df.drop(dropped, axis=1)
clean_df = clean_df.dropna()
clean_df = clean_df.sample(frac=1)
clean_df["Y"] = clean_df[Y_].values
In these next steps I split the dataset into train, validation and test and convert the data into tensors that can be consumed by PyTorch.
The tensor objects, a concept borrowed from physics and mathematics are used as a way to arrange data that is fairly generic; which is easier to illustrate with examples: Tensor of dimension 0 es a number, a tensor of dimension 1 is a vector (a collection of numbers), a tensor of dimension 2 is a matrix, a tensor of dimension 3 is a cube of data, and so on.
The three datasets used here are for:
- train: where the model will run and gather intelligence
- validation: in every step of the model, metrics will be obtained about its accuracy on this set, the results will be used to determine the course of action.
- test: this dataset will be left alone and used only at the end to inspect the performance of the result.
#due to the size of the dataset, it might be necessary to keep only a fraction of it, here 50%
clean_dfshort = clean_df.sample(frac=0.5)
#predictors
ins = clean_dfshort.drop([Y_,"Y"], axis=1)
#target: collection of 1 and 0
outs = clean_dfshort[[Y_,"Y"]]
X = ins.copy()
Y = outs["Y"]
#split train and test
from sklearn.utils import resample
from sklearn.model_selection import train_test_split
import math
import torch
X_2, X_test, y_2, y_test = train_test_split(X, Y, test_size=0.25, stratify=Y)
#split train and validation
X_train, X_val, y_train, y_val = train_test_split(X_2, y_2, test_size=0.25, stratify=y_2)
#upsample X train
#this is done because the number of hits (fail to recovery) is very low
#it is necessary to rebalance the classes
df_t = pd.concat([pd.DataFrame(X_train),pd.DataFrame(y_train)], axis=1)
df_majority = df_t[df_t[df_t.columns[-1]]<0.5]
df_minority = df_t[df_t[df_t.columns[-1]]>0.5]
df_minority_upsampled = resample(df_minority, replace=True, n_samples=math.floor(len(df_majority)*0.25))
df_upsampled = pd.concat([df_majority, df_minority_upsampled])
df_upsampled = df_upsampled.sample(frac=1).reset_index(drop=True)
X_train = df_upsampled.drop(df_upsampled.columns[-1], axis=1)
y_train = df_upsampled[df_upsampled.columns[-1]]
input_size = X_train.shape[1]
#convert to tensors
X_train = X_train.astype(float).to_numpy()
X_test = X_test.astype(float).to_numpy()
X_val = X_val.astype(float).to_numpy()
y_train = y_train.astype(float).to_numpy()
y_test = y_test.astype(float).to_numpy()
y_val = y_val.astype(float).to_numpy()
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.long)
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
#batch size to train, one of the parameters we can use for tunning
batch_size = 700
#this is a packager for the datasets
dataloaders = {'train': torch.utils.data.DataLoader(train_dataset, batch_size=batch_size),
'val': torch.utils.data.DataLoader(val_dataset, batch_size=batch_size),
'test': torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)}
dataset_sizes = {'train': len(train_dataset),
'val': len(val_dataset),
'test': len(test_dataset)}
print(f'dataset_sizes = {dataset_sizes}')
The output of this is the size of each of the datasets, train, test and validation.
The next step is to define the neural network. This might take some time and effort, requiring retraining and testing parameters and configurations until the desired result is achieved.
The recommended approach I use is to start with a simple model, see if there is predictive power in it, and then start complicating it by making it wider (more neurons) and deeper (more layers). The objective here is to end with a model that overfits the data.
Once we are successful in that, the next step is to reduce overfitting to improve the result metrics on the validation set.
We will see more around this in the next steps. This class defines a simple multilayer perceptron.
import torch.nn as nn
#this class is the final one, after adding the layers and training and iterating to fine the best result
class SimpleClassifier(nn.Module):
def __init__(self):
super(SimpleClassifier, self).__init__()
#the dropout layer is introduced to reduce the overfiting (so as explained, it is set to 0 or very low at first)
#dropout is telling the neural network to drop data between layers randomly to introduce variability
self.dropout = nn.Dropout(0.1)
#for the layers I recommend to start a little over twice the number of columns and increase from there from a layer to the next
#then decrease again down to 2, in this case the response is binary
self.layers = nn.Sequential(
nn.Linear(input_size, 250),
nn.Linear(250, 500),
nn.Linear(500, 1000),
nn.Linear(1000, 1500),
nn.ReLU(),
self.dropout,
nn.Linear(1500, 1500),
nn.Sigmoid(),
self.dropout,
nn.Linear(1500, 1500),
nn.ReLU(),
self.dropout,
nn.Linear(1500, 1500),
nn.Sigmoid(),
self.dropout,
nn.Linear(1500, 1500),
nn.ReLU(),
self.dropout,
nn.Linear(1500, 1500),
nn.Sigmoid(),
self.dropout,
nn.Linear(1500, 1500),
nn.ReLU(),
self.dropout,
nn.Linear(1500, 500),
nn.Sigmoid(),
self.dropout,
nn.Linear(500, 500),
nn.ReLU(),
self.dropout,
nn.Linear(500, 500),
nn.Sigmoid(),
self.dropout,
#the last layer outputs 2 since the response variable is binary (0,1)
#the output of a multiclass classification should be of the size of the number of classes
nn.Linear(500, 2),
)
def forward(self, x):
return self.layers(x)
#define model
model = SimpleClassifier()
The next block deals with the training of the model.
These are the training parameters:
- epochs: number of times the model will be trained. Set it low at first, then increment it as long as the model keeps learning
- learning rate: how are the weights of the neurons updated. Too big of a value makes the results to oscilate between two values. Without being too technical, training is about finding the minimum of a function using the gradients, to do that it tests the value of the gradient of the function (slope), this amount is how much is going to vary in each step. if it is too big, the point will oscillate between values on both sides of the slope instead of descending gently to where the slope is closest to 0 (minimum).
I selected to use cross entropy loss, as it is the typical loss function to minimize for binary classification problems.
But, since the classes are highly unbalanced, metrics as the accuracy are not adequate to express how good the model is performing (in that case the model will keep a route where it makes the accuracy higher by labeling most or all cases with the negative result, which increases the accuracy). To account for that effect, I use the f1 metric to select which model performs better.
import copy
model = SimpleClassifier()
model.train()
#these are the training parameters
num_epochs=100
learning_rate = 0.00001
regularization = 0.0000001
#loss function
criterion = nn.CrossEntropyLoss()
#determine gradient values
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=regularization)
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
best_f1 = 0.0
best_epoch = 0
phases = ['train', 'val']
training_curves = {}
epoch_loss = 1
epoch_f1 = 0
epoch_acc = 0
for phase in phases:
training_curves[phase+'_loss'] = []
training_curves[phase+'_acc'] = []
training_curves[phase+'_f1'] = []
for epoch in range(num_epochs):
print(f'\nEpoch {epoch+1}/{num_epochs}')
print('-' * 10)
for phase in phases:
if phase == 'train':
model.train()
else:
model.eval()
running_loss = 0.0
running_corrects = 0
running_fp = 0
running_tp = 0
running_tn = 0
running_fn = 0
# Iterate over data.
for inputs, labels in dataloaders[phase]:
inputs = inputs.view(inputs.shape[0],-1)
inputs = inputs
labels = labels
# zero the parameter gradients
optimizer.zero_grad()
# forward
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, predictions = torch.max(outputs, 1)
loss = criterion(outputs, labels)
if phase == 'train':
loss.backward()
optimizer.step()
# statistics. Uses the f1 metric
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(predictions == labels.data)
running_fp += torch.sum((predictions != labels.data) & (predictions >= 0.5))
running_tp += torch.sum((predictions == labels.data) & (predictions >= 0.5))
running_fn += torch.sum((predictions != labels.data) & (predictions < 0.5))
running_tn += torch.sum((predictions == labels.data) & (predictions < 0.5))
print(f'Epoch {epoch+1}, {phase:5} Loss: {epoch_loss:.7f} F1: {epoch_f1:.7f} Acc: {epoch_acc:.7f} Partial loss: {loss.item():.7f} Best f1: {best_f1:.7f} ')
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
epoch_f1 = (2*running_tp.double()) / (2*running_tp.double() + running_fp.double() + running_fn.double() + 0.0000000000000000000001)
training_curves[phase+'_loss'].append(epoch_loss)
training_curves[phase+'_acc'].append(epoch_acc)
training_curves[phase+'_f1'].append(epoch_f1)
print(f'Epoch {epoch+1}, {phase:5} Loss: {epoch_loss:.7f} F1: {epoch_f1:.7f} Acc: {epoch_acc:.7f} Best f1: {best_f1:.7f} ')
if phase == 'val' and epoch_f1 >= best_f1:
best_epoch = epoch
best_acc = epoch_acc
best_f1 = epoch_f1
best_model_wts = copy.deepcopy(model.state_dict())
print(f'Best val F1: {best_f1:5f}, Best val Acc: {best_acc:5f} at epoch {best_epoch}')
# load best model weights
model.load_state_dict(best_model_wts)
As we can see, with these settings I get to a good result in terms of f1.
The next step is to plot the training curves
#plot training curves
import matplotlib.pyplot as plt
epochs = list(range(len(training_curves['train_loss'])))
for metric in ['loss','acc','f1']:
plt.figure()
plt.title(f'Training curves - {metric}')
for phase in phases:
key = phase+'_'+metric
if key in training_curves:
plt.plot(epochs, training_curves[phase+'_'+metric])
plt.xlabel('epoch')
plt.legend(labels=phases)
These are very good curves, since I have already dealt with overfitting issues, but if there is overfitting (as it should be before introducing the dropout regularization) the validation curves should be separated from the training curves. Good results in training (high f1 and accuracy, low loss), and bad results in validation mean overfitting.
The next block plots the results on the Validation dataset. Remember that the test set is only reserved for the end, which is the unseen data
#plot results on VALIDATION
# load best model weights
model.load_state_dict(best_model_wts)
import sklearn.metrics as metrics
class_labels = ['0','1']
def classify_predictions(model, dataloader, cutpoint):
model.eval() # Set model to evaluate mode
all_labels = torch.tensor([])
all_scores = torch.tensor([])
all_preds = torch.tensor([])
for inputs, labels in dataloader:
inputs = inputs
labels = labels
outputs = torch.softmax(model(inputs),dim=1)
scores = torch.div(outputs[:,1],(outputs[:,1] + outputs[:,0]) )
preds = (scores>=cutpoint).float()
all_labels = torch.cat((all_labels, labels), 0)
all_scores = torch.cat((all_scores, scores), 0)
all_preds = torch.cat((all_preds, preds), 0)
return all_preds.detach(), all_labels.detach(), all_scores.detach()
def plot_metrics(model, dataloaders, phase='val', cutpoint=0.5):
preds, labels, scores = classify_predictions(model, dataloaders[phase], cutpoint)
fpr, tpr, thresholds = metrics.roc_curve(labels, scores)
auc = metrics.roc_auc_score(labels, preds)
disp = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc)
ind = np.argmin(np.abs(thresholds - 0.5))
ind2 = np.argmin(np.abs(thresholds - 0.1))
ind3 = np.argmin(np.abs(thresholds - 0.25))
ind4 = np.argmin(np.abs(thresholds - 0.75))
ind5 = np.argmin(np.abs(thresholds - 0.1))
ax = disp.plot().ax_
ax.scatter(fpr[ind], tpr[ind], color = 'red')
ax.scatter(fpr[ind2], tpr[ind2], color = 'blue')
ax.scatter(fpr[ind3], tpr[ind3], color = 'black')
ax.scatter(fpr[ind4], tpr[ind4], color = 'orange')
ax.scatter(fpr[ind5], tpr[ind5], color = 'green')
ax.set_title('ROC Curve (green=0.1, orange=0.25, red=0.5, black=0.75, blue=0.9)')
f1sc = metrics.f1_score(labels, preds)
cm = metrics.confusion_matrix(labels, preds)
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_labels)
ax = disp.plot().ax_
ax.set_title('Confusion Matrix -- counts, f1: ' + str(f1sc))
ncm = metrics.confusion_matrix(labels, preds, normalize='true')
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=ncm)
ax = disp.plot().ax_
ax.set_title('Confusion Matrix -- rates, f1: ' + str(f1sc))
TN, FP, FN, TP = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
N, P = TN + FP, TP + FN
ACC = (TP + TN)/(P+N)
TPR, FPR, FNR, TNR = TP/P, FP/N, FN/P, TN/N
print(f'\nAt default threshold:')
print(f' TN = {TN:5}, FP = {FP:5} -> N = {N:5}')
print(f' FN = {FN:5}, TP = {TP:5} -> P = {P:5}')
print(f'TNR = {TNR:5.3f}, FPR = {FPR:5.3f}')
print(f'FNR = {FNR:5.3f}, TPR = {TPR:5.3f}')
print(f'ACC = {ACC:6.3f}')
return cm, fpr, tpr, thresholds, auc, f1sc
res = plot_metrics(model, dataloaders, phase='val', cutpoint=0.5)
The first plot is the ROC curve, which I have made to display four dots for cutting points on 0.1, 0.25, 0.5, 0.75 and 0.9. Area under the curve is high, which means that ours is a good model and the point closest to the elbow is at 0.1. I will later use that value to cut when I evaluate the test set.
The next two charts are the confusion matrix (actual value and rates).
Now, I want to run the model on the test, unseen data. This is new data never seen before by the model, which means that the performance of the model here will be close to the real performance on inference.
I use the cut point of 0.1 found in the previous step. The results are very promising.
#plot results on TEST
bestcut = 0.1
# load best model weights
model.load_state_dict(best_model_wts)
import sklearn.metrics as metrics
class_labels = ['0','1']
def classify_predictions(model, dataloader, cutpoint):
model.eval() # Set model to evaluate mode
all_labels = torch.tensor([])
all_scores = torch.tensor([])
all_preds = torch.tensor([])
for inputs, labels in dataloader:
inputs = inputs
labels = labels
outputs = torch.softmax(model(inputs),dim=1)
scores = torch.div(outputs[:,1],(outputs[:,1] + outputs[:,0]) )
preds = (scores>=cutpoint).float()
all_labels = torch.cat((all_labels, labels), 0)
all_scores = torch.cat((all_scores, scores), 0)
all_preds = torch.cat((all_preds, preds), 0)
return all_preds.detach(), all_labels.detach(), all_scores.detach()
def plot_metrics(model, dataloaders, phase='test', cutpoint=bestcut):
preds, labels, scores = classify_predictions(model, dataloaders[phase], cutpoint)
fpr, tpr, thresholds = metrics.roc_curve(labels, scores)
auc = metrics.roc_auc_score(labels, preds)
disp = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=auc)
ind = np.argmin(np.abs(thresholds - 0.5))
ind2 = np.argmin(np.abs(thresholds - 0.1))
ind3 = np.argmin(np.abs(thresholds - 0.25))
ind4 = np.argmin(np.abs(thresholds - 0.75))
ind5 = np.argmin(np.abs(thresholds - 0.1))
ax = disp.plot().ax_
ax.scatter(fpr[ind], tpr[ind], color = 'red')
ax.scatter(fpr[ind2], tpr[ind2], color = 'blue')
ax.scatter(fpr[ind3], tpr[ind3], color = 'black')
ax.scatter(fpr[ind4], tpr[ind4], color = 'orange')
ax.scatter(fpr[ind5], tpr[ind5], color = 'green')
ax.set_title('ROC Curve (green=0.1, orange=0.25, red=0.5, black=0.75, blue=0.9)')
f1sc = metrics.f1_score(labels, preds)
cm = metrics.confusion_matrix(labels, preds)
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_labels)
ax = disp.plot().ax_
ax.set_title('Confusion Matrix -- counts, f1: ' + str(f1sc))
ncm = metrics.confusion_matrix(labels, preds, normalize='true')
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=ncm)
ax = disp.plot().ax_
ax.set_title('Confusion Matrix -- rates, f1: ' + str(f1sc))
TN, FP, FN, TP = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
N, P = TN + FP, TP + FN
ACC = (TP + TN)/(P+N)
TPR, FPR, FNR, TNR = TP/P, FP/N, FN/P, TN/N
print(f'\nAt default threshold:')
print(f' TN = {TN:5}, FP = {FP:5} -> N = {N:5}')
print(f' FN = {FN:5}, TP = {TP:5} -> P = {P:5}')
print(f'TNR = {TNR:5.3f}, FPR = {FPR:5.3f}')
print(f'FNR = {FNR:5.3f}, TPR = {TPR:5.3f}')
print(f'ACC = {ACC:6.3f}')
return cm, fpr, tpr, thresholds, auc, f1sc
res = plot_metrics(model, dataloaders, phase='test', cutpoint=bestcut)
Now, I save the model to our repository using Pickle. I also saved a config file for the model which holds information to validate any new dataset that will be used for inference and the metrics.
f1onTest = res[5]
f1onVal = best_f1.item()
cutPoint = bestcut
modelDictionary = {"droppedCols":dropped, "Y":Y_, "f1onTest": f1onTest, "input_size":input_size, "f1onVal": f1onVal, "cutPoint": cutPoint}
torch.save(model.state_dict(), "./modelConfig.pth")
import pickle
with open('Model.pkl', 'wb') as f:
pickle.dump(modelDictionary, f)