1. logistic regression

Pytorch

steps: prepare and load data » split into train and test set » normalize » define NN » train and evaluate

🏛️ Preparing dataset

Data source and download

ds = np.lib.DataSource() #open and read iris dataset
fp = ds.open('http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data')

# convert data into numpy array
x = np.genfromtxt(BytesIO(fp.read().encode()), delimiter=',', usecols=range(2), max_rows=100)

# create a binary classification problem with 50 0 labels and 50 1 labels
y = np.zeroes(100)
y[50:] = 1

np.random.seed(1)

#create and shuffle indices
idx = np.arange(y.shape[0])
np.random.shuffle(idx)

# split into train and test sets
X_test, y_test = x[idx[:25]], y[idx[:25]]
X_train, y_train = x[idx[25:]], y[idx[25:]]

#normalize the data by computing mean and std and then standatdize it in train and test sets
mu, std = np.mean(X_train, axis=0), np.std(X_train, axis=0)
X_train, X_test = (X_train - mu) / std, (X_test - mu) / std

🦒 Low level implementation using manual gradients

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#perform element wise replacement based on a condition. used to extract labels from prediction
def custom_where(cond, x_1, x_2):
    return (cond * x_1) + ((1-cond) * x_2)

# model with one hidden layer and bias term
class LogisticRegression1():
    def __init__(self, num_features):
        self.num_features = num_features
        self.weights = torch.zeros(num_features, 1, 
                                   dtype=torch.float32, device=device)
        self.bias = torch.zeros(1, dtype=torch.float32, device=device)

#compute output given input x
    def forward(self, x):
        linear = torch.add(torch.mm(x, self.weights), self.bias)
        probas = self._sigmoid(linear)
        return probas

#calculte errors based on true labels
    def backward(self, probas, y):  
        errors = y - probas.view(-1)
        return errors
            
#extract labels using custom where function
    def predict_labels(self, x):
        probas = self.forward(x)
        labels = custom_where(probas >= .5, 1, 0)
        return labels    
            
    def evaluate(self, x, y):
        labels = self.predict_labels(x).float()
        accuracy = torch.sum(labels.view(-1) == y) / y.size()[0]
        return accuracy
    
#for forward prop 
    def _sigmoid(self, z):
        return 1. / (1. + torch.exp(-z))
    
    def _logit_cost(self, y, proba):
        tmp1 = torch.mm(-y.view(1, -1), torch.log(proba))
        tmp2 = torch.mm((1 - y).view(1, -1), torch.log(1 - proba))
        return tmp1 - tmp2
    
    def train(self, x, y, num_epochs, learning_rate=0.01):
        for e in range(num_epochs):
            
            #### Compute outputs ####
            probas = self.forward(x)
            
            #### Compute gradients ####
            errors = self.backward(probas, y)
            neg_grad = torch.mm(x.transpose(0, 1), errors.view(-1, 1))
            
            #### Update weights ####
            self.weights += learning_rate * neg_grad
            self.bias += learning_rate * torch.sum(errors)
            
            #### Logging ####
            print('Epoch: %03d' % (e+1), end="")
            print(' | Train ACC: %.3f' % self.evaluate(x, y), end="")
            print(' | Cost: %.3f' % self._logit_cost(y, self.forward(x)))

#convert numpy arrays into pytorch tensor for training
X_train_tensor = torch.tensor(X_train, dtype=torch.float32, device=device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32, device=device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32, device=device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32, device=device)

#initialize lr model with 2 features 
logr = LogisticRegression1(num_features=2)

#train the model for 10 epochs
logr.train(X_train_tensor, y_train_tensor, num_epochs=10, learning_rate=0.1)

test_acc = logr.evaluate(X_test_tensor, y_test_tensor)
print('Test set accuracy: %.2f%%' % (test_acc*100))

print('\nModel parameters:')
print('  Weights: %s' % logr.weights)
print('  Bias: %s' % logr.bias)

Test set accuracy: 100.00%

🦓 Low level implementation using Autograd

def custom_where(cond, x_1, x_2):
    return (cond * x_1) + ((1-cond) * x_2)


class LogisticRegression2():
    def __init__(self, num_features):
        self.num_features = num_features
        
        self.weights = torch.zeros(num_features, 1, 
                                   dtype=torch.float32,
                                   device=device,
                                   requires_grad=True) # req. for autograd!
        self.bias = torch.zeros(1, 
                                dtype=torch.float32,
                                device=device,
                                requires_grad=True) # req. for autograd!

    def forward(self, x):
        linear = torch.add(torch.mm(x, self.weights), self.bias)
        probas = self._sigmoid(linear)
        return probas
                    
    def predict_labels(self, x):
        probas = self.forward(x)
        labels = custom_where((probas >= .5).float(), 1, 0)
        return labels    
            
    def evaluate(self, x, y):
        labels = self.predict_labels(x)
        accuracy = (torch.sum(labels.view(-1) == y.view(-1))).float() / y.size()[0]
        return accuracy
    
    def _sigmoid(self, z):
        return 1. / (1. + torch.exp(-z))
    
    def _logit_cost(self, y, proba):
        tmp1 = torch.mm(-y.view(1, -1), torch.log(proba))
        tmp2 = torch.mm((1 - y).view(1, -1), torch.log(1 - proba))
        return tmp1 - tmp2
    
    def train(self, x, y, num_epochs, learning_rate=0.01):
        
        for e in range(num_epochs):
            
            #### Compute outputs ####
            proba = self.forward(x)
            cost = self._logit_cost(y, proba)
            
            #### Compute gradients ####
            cost.backward()
            
            #### manual update weights ####
            
            #detach the weights from computation graph
            tmp = self.weights.detach()
            tmp -= learning_rate * self.weights.grad #perform gradient descent
            
            tmp = self.bias.detach()
            tmp -= learning_rate * self.bias.grad
            
            #### Reset gradients to zero for next iteration ####
            self.weights.grad.zero_()
            self.bias.grad.zero_()
    
            #### Logging ####
            print('Epoch: %03d' % (e+1), end="")
            print(' | Train ACC: %.3f' % self.evaluate(x, y), end="")
            print(' | Cost: %.3f' % self._logit_cost(y, self.forward(x)))
            

🦄 High level implementation using nn.Module

class LogisticRegression(torch.nn.Module):
    def __init__(self, num_features):
        super(LogisticRegression, self).__init__()
        self.linear = torch.nn.Linear(num_features,1) #fully connected layer with weights and bias initialized randomly
        self.linear.weight.detach().zero_()
        self.linear.bias.detach().zero_()

    def forward(self, x):
        logits = self.linear(x) #apply linear layer to input x
        probas = torch.sigmoid(logits) #apply sigmoid to logits to produe probabilities
        return probas


def comp_accuracy(label_var, pred_probas):
    pred_labels = custom_where((pred_probas > 0.5).float(), 1, 0).view(-1)
    acc = torch.sum(pred_labels == label_var.view(-1)).float() / label_var.size(0)
    return acc

#initialize model with 2 features
model = LogisticRegression(num_features=2).to(device)

#Binary cross entropy loss used for binary classif
cost_fn = torch.nn.BCELoss(reduction='sum')

#stochastic gradient descent for optimization algo
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

#convert array to tensors for training
X_train_tensor = torch.tensor(X_train, dtype=torch.float32, device=device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32, device=device).view(-1, 1)

#training loop
for epoch in range(num_epochs):
    
    #### Compute outputs ####
    out = model(X_train_tensor)
    
    #### Compute gradients ####
    cost = cost_fn(out, y_train_tensor)
    optimizer.zero_grad()
    cost.backward()
    
    #### Update weights ####  
    optimizer.step()
    
    #### Logging ####      
    pred_probas = model(X_train_tensor)
    acc = comp_accuracy(y_train_tensor, pred_probas)
    print('Epoch: %03d' % (epoch + 1), end="")
    print(' | Train ACC: %.3f' % acc, end="")
    print(' | Cost: %.3f' % cost_fn(pred_probas, y_train_tensor))
Epoch: 001 Train ACC: 0.987 Cost: 5.581
Epoch: 002 Train ACC: 0.987 Cost: 4.882
Epoch: 003 Train ACC: 1.000 Cost: 4.381
Epoch: 004 Train ACC: 1.000 Cost: 3.998
Epoch: 005 Train ACC: 1.000 Cost: 3.693
Epoch: 006 Train ACC: 1.000 Cost: 3.443
Epoch: 007 Train ACC: 1.000 Cost: 3.232
Epoch: 008 Train ACC: 1.000 Cost: 3.052
Epoch: 009 Train ACC: 1.000 Cost: 2.896
Epoch: 010 Train ACC: 1.000 Cost: 2.758

Model parameters: Weights: Parameter containing: tensor([[ 4.2267, -2.9613]], device=’cuda:0’, requires_grad=True) Bias: Parameter containing: tensor([0.0994], device=’cuda:0’, requires_grad=True)

Tensorflow

import tensorflow as tf

def iterate_minibatches(arrays, batch_size, shuffle=False, seed=None):
    rgen = np.random.RandomState(seed)
    #array used to slice inputs to mini batches
    indices = np.arange(arrays[0].shape[0])

    if shuffle:
        rgen.shuffle(indices)

    #create mini batches
    for start_idx in range(0, indices.shape[0] - batch_size + 1, batch_size):
        index_slice = indices[start_idx:start_idx + batch_size]

        yield (ary[index_slice] for ary in arrays)

##########################
### SETTINGS
##########################

n_features = x.shape[1]
n_samples = x.shape[0]
learning_rate = 0.05
training_epochs = 15
batch_size = 10


##########################
### GRAPH DEFINITION
##########################

g = tf.Graph()
with g.as_default() as g:

   # Input data
    tf_x = tf.placeholder(dtype=tf.float32,
                          shape=[None, n_features], name='inputs')
    tf_y = tf.placeholder(dtype=tf.float32,
                          shape=[None], name='targets')
    
    # Model parameters
    params = {
        'weights': tf.Variable(tf.zeros(shape=[n_features, 1],
                                               dtype=tf.float32), name='weights'),
        'bias': tf.Variable([[0.]], dtype=tf.float32, name='bias')}

    # Logistic Regression
    # perform linear combination 
    linear = tf.matmul(tf_x, params['weights']) + params['bias']
    
    #apply sigmoid for probabilities
    pred_proba = tf.sigmoid(linear, name='predict_probas')

    # Loss and optimizer
    r = tf.reshape(pred_proba, [-1])
    cost = tf.reduce_mean(tf.reduce_sum((-tf_y * tf.log(r)) - 
                                        ((1. - tf_y) * tf.log(1. - r))), name='cost')
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
    train = optimizer.minimize(cost, name='train')
                                                  
    # Class prediction
    pred_labels = tf.round(tf.reshape(pred_proba, [-1]), name='predict_labels')
    correct_prediction = tf.equal(tf_y, pred_labels)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name='accuracy')


##########################
### TRAINING & EVALUATION
##########################
    
with tf.Session(graph=g) as sess:
    sess.run(tf.global_variables_initializer())
    
    avg_cost = np.nan
    count = 1
    
    for epoch in range(training_epochs):

        train_acc = sess.run('accuracy:0', feed_dict={tf_x: x_train,
                                                      tf_y: y_train})
        valid_acc = sess.run('accuracy:0', feed_dict={tf_x: x_test,
                                                      tf_y: y_test}) 

        print("Epoch: %03d | AvgCost: %.3f" % (epoch, avg_cost / count), end="")
        print(" | Train/Valid ACC: %.2f/%.2f" % (train_acc, valid_acc))
        
        avg_cost = 0.
        for x_batch, y_batch in iterate_minibatches(arrays=[x_train, y_train],
                                                    batch_size=batch_size, 
                                                    shuffle=True, seed=123):
            
            feed_dict = {'inputs:0': x_batch,
                         'targets:0': y_batch}
            _, c = sess.run(['train', 'cost:0'], feed_dict=feed_dict)

            avg_cost += c
            count += 1

    weights, bias = sess.run(['weights:0', 'bias:0'])
    print('\nWeights:\n', weights)
    print('\nBias:\n', bias)
Epoch: 000 AvgCost: nan Train/Valid ACC: 0.53/0.40
Epoch: 001 AvgCost: 4.221 Train/Valid ACC: 1.00/1.00
Epoch: 002 AvgCost: 1.225 Train/Valid ACC: 1.00/1.00
Epoch: 003 AvgCost: 0.610 Train/Valid ACC: 1.00/1.00
Epoch: 004 AvgCost: 0.376 Train/Valid ACC: 1.00/1.00
Epoch: 005 AvgCost: 0.259 Train/Valid ACC: 1.00/1.00
Epoch: 006 AvgCost: 0.191 Train/Valid ACC: 1.00/1.00
Epoch: 007 AvgCost: 0.148 Train/Valid ACC: 1.00/1.00
Epoch: 008 AvgCost: 0.119 Train/Valid ACC: 1.00/1.00
Epoch: 009 AvgCost: 0.098 Train/Valid ACC: 1.00/1.00
Epoch: 010 AvgCost: 0.082 Train/Valid ACC: 1.00/1.00
Epoch: 011 AvgCost: 0.070 Train/Valid ACC: 1.00/1.00
Epoch: 012 AvgCost: 0.061 Train/Valid ACC: 1.00/1.00
Epoch: 013 AvgCost: 0.053 Train/Valid ACC: 1.00/1.00
Epoch: 014 AvgCost: 0.047 Train/Valid ACC: 1.00/1.00

Weights: [[ 3.31176686] [-2.40808702]]

Bias: [[-0.01001291]]


2. Softmax regression (multinomial logistic regression)

Pytorch

Softmax regression, also known as multinomial logistic regression, is a generalization of logistic regression used for multi-class classification problems. It extends the binary classification capabilities of logistic regression to handle multiple classes.

Dataset

from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#hyper parameters
random_seed = 123
learning_rate = 0.1
num_epochs = 10
batch_size = 256

num_features = 784
num_classes = 10

#download and transform datasets
train_dataset = datasets.MNIST(root='data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='data', train=False, transform=transforms.ToTensor())

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle-True)

class SoftmaxRegression(torch.nn.Module):

    def __init__(self, num_features, num_classes):
        super(SoftmaxRegression, self).__init__()
        self.linear = torch.nn.Linear(num_features, num_classes)
        
        self.linear.weight.detach().zero_()
        self.linear.bias.detach().zero_()
        
    def forward(self, x):
        logits = self.linear(x)
        probas = F.softmax(logits, dim=1) #transform logits into class probabilities
        return logits, probas

model = SoftmaxRegression(num_features=num_features,
                          num_classes=num_classes)

model.to(device)

##########################
### COST AND OPTIMIZER
##########################

optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)  

# Manual seed for deterministic data loader
torch.manual_seed(random_seed)


def compute_accuracy(model, data_loader):
    correct_pred, num_examples = 0, 0
    
    for features, targets in data_loader:
        features = features.view(-1, 28*28).to(device)
        targets = targets.to(device)
        logits, probas = model(features)
        _, predicted_labels = torch.max(probas, 1)
        num_examples += targets.size(0)
        correct_pred += (predicted_labels == targets).sum()
        
    return correct_pred.float() / num_examples * 100
    
#training loop
for epoch in range(num_epochs):
    for batch_idx, (features, targets) in enumerate(train_loader):
        
        features = features.view(-1, 28*28).to(device)
        targets = targets.to(device)
            
        ### FORWARD AND BACK PROP
        logits, probas = model(features)
        
        # note that the PyTorch implementation of
        # CrossEntropyLoss works with logits, not
        # probabilities
        cost = F.cross_entropy(logits, targets)
        optimizer.zero_grad()
        cost.backward()
        
        ### UPDATE MODEL PARAMETERS
        optimizer.step()
        
        ### LOGGING
        if not batch_idx % 50:
            print ('Epoch: %03d/%03d | Batch %03d/%03d | Cost: %.4f' 
                   %(epoch+1, num_epochs, batch_idx, 
                     len(train_dataset)//batch_size, cost))
            
    with torch.set_grad_enabled(False):
        print('Epoch: %03d/%03d training accuracy: %.2f%%' % (
              epoch+1, num_epochs, 
              compute_accuracy(model, train_loader)))

3. RNNs

Recurrent Neural Networks (RNNs) are a type of neural network architecture designed to process sequential data, where the output at each step depends on both the current input and the previous hidden state. RNNs have a unique architecture that allows them to maintain an internal memory or “hidden state” which carries information from one time step to the next, enabling them to model temporal dependencies in data.

The basic building blocks of an RNN include:

  1. Input Layer: This layer consists of input units that take a single data point (feature vector) x_t as input at each time step t.

  2. Hidden State and Cell or Gating Vector: The hidden state h_t and the cell or gating vector g_t are the internal variables of an RNN. The hidden state h_t represents the network’s understanding or memory of the previous time step, while the gating vector g_t determines how much of the previous hidden state should be retained for the next time step and how much new information from the current input should be incorporated.

  3. Output Layer: This layer produces an output y_t based on the current hidden state h_t and the weighted sum of the inputs (x_t and h_(t-1)). In many applications, the output y_t represents a probability distribution over classes or a prediction for the next time step.

  4. Weights: RNNs use weights to determine the connections between different layers. The weights are updated during training using backpropagation through time (BPTT).

  5. Activation Functions: The activation functions applied at each layer help introduce non-linearity into the model, enabling it to learn complex representations. Common choices for activation functions in RNNs include sigmoid, tanh, and ReLU.

download data: !wget http://www.gutenberg.org/files/98/98-0.txt

#settings and parameters
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

TEXT_PORTION_SIZE = 200

NUM_ITER = 20000
LEARNING_RATE = 0.005
EMBEDDING_DIM = 100
HIDDEN_DIM = 100
NUM_HIDDEN = 1

with open('./98-0.txt', 'r') as f:
    textfile = f.read()

# convert special characters
textfile = unidecode.unidecode(textfile)

# strip extra whitespaces
textfile = re.sub(' +',' ', textfile)

TEXT_LENGTH = len(textfile)

def random_portion(textfile):
    start_index = random.randint(0, TEXT_LENGTH - TEXT_PORTION_SIZE)
    end_index = start_index + TEXT_PORTION_SIZE + 1
    return textfile[start_index:end_index]

print(random_portion(textfile))

#convert chars to tensor for training
def char_to_tensor(text):
    lst = [string.printable.index(c) for c in text]
    tensor = torch.tensor(lst).long()
    return tensor

print(char_to_tensor('abcDEF'))

### Model ###
class RNN(torch.nn.Module):
    def __init__(self, input_size, embed_size,
                 hidden_size, output_size, num_layers):
        super(RNN, self).__init__()

        self.num_layers = num_layers
        self.hidden_size = hidden_size
        
        self.embed = torch.nn.Embedding(input_size, hidden_size)
        self.gru = torch.nn.GRU(input_size=embed_size,
                                hidden_size=hidden_size,
                                num_layers=num_layers)
        self.fc = torch.nn.Linear(hidden_size, output_size)
        self.init_hidden = torch.nn.Parameter(torch.zeros(
                                              num_layers, 1, hidden_size))
    
    def forward(self, features, hidden):
        embedded = self.embed(features.view(1, -1))
        output, hidden = self.gru(embedded.view(1, 1, -1), hidden)
        output = self.fc(output.view(1, -1))
        return output, hidden
      
    def init_zero_state(self):
        init_hidden = torch.zeros(self.num_layers, 1, self.hidden_size).to(DEVICE)
        return init_hidden


torch.manual_seed(RANDOM_SEED)
model = RNN(len(string.printable), EMBEDDING_DIM, HIDDEN_DIM, len(string.printable), NUM_HIDDEN)
model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

def evaluate(model, prime_str='A', predict_len=100, temperature=0.8):
    ## based on https://github.com/spro/practical-pytorch/
    ## blob/master/char-rnn-generation/char-rnn-generation.ipynb

    hidden = model.init_zero_state()
    prime_input = char_to_tensor(prime_str)
    predicted = prime_str

    # Use priming string to "build up" hidden state
    for p in range(len(prime_str) - 1):
        _, hidden = model(prime_input[p].to(DEVICE), hidden.to(DEVICE))
    inp = prime_input[-1]
    
    for p in range(predict_len):
        output, hidden = model(inp.to(DEVICE), hidden.to(DEVICE))
        
        # Sample from the network as a multinomial distribution
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]
        
        # Add predicted character to string and use as next input
        predicted_char = string.printable[top_i]
        predicted += predicted_char
        inp = char_to_tensor(predicted_char)

    return predicted

start_time = time.time()
for iteration in range(NUM_ITER):

    ### FORWARD AND BACK PROP
    hidden = model.init_zero_state()
    optimizer.zero_grad()
    
    loss = 0.
    inputs, targets = draw_random_sample(textfile)
    inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
    for c in range(TEXT_PORTION_SIZE):
        outputs, hidden = model(inputs[c], hidden)
        loss += F.cross_entropy(outputs, targets[c].view(1))

    loss /= TEXT_PORTION_SIZE
    loss.backward()
    
    ### UPDATE MODEL PARAMETERS
    optimizer.step()

    ### LOGGING
    with torch.set_grad_enabled(False):
      if iteration % 1000 == 0:
          print(f'Time elapsed: {(time.time() - start_time)/60:.2f} min')
          print(f'Iteration {iteration} | Loss {loss.item():.2f}\n\n')
          print(evaluate(model, 'Th', 200), '\n')
          print(50*'=')