Menu

Helper Module for Deep Learning.

pynet: self supervised clusteringΒΆ

Credit: A Grigis

import os
import sys
if "CI_MODE" in os.environ:
    sys.exit()

# Imports
import collections
import logging
import pynet
from pynet.metrics import SKMetrics
from pynet.datasets import DataManager
from pynet.interfaces import DeepLearningInterface
from pynet.interfaces import DeepClusterClassifier
from pynet.models import BrainNetCNN
from pynet.utils import setup_logging
from pynet.plotting import Board, update_board
from pynet.models.deepcluster import update_pseudo_labels
import torch
import torch.nn as nn
from torch.utils.data.sampler import Sampler
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.cluster import MiniBatchKMeans, KMeans
from sklearn.metrics import classification_report
from sklearn.metrics import roc_curve, auc
try:
    import faiss
except:
    pass


# Global Parameters
OUTDIR = "/tmp/graph_connectome"
BATCH_SIZE = 5
N_EPOCHS = 20
N_CLUSTERS = 2
N_SAMPLES = 40
AVOID_EMPTY_CLUSTERS = False
UNIFORM_SAMPLING = True
setup_logging(level="info")


# Load the data
data = []
labels = []
for idx in range(N_CLUSTERS):

    x_train = np.ones((N_SAMPLES, 1, 90, 90)) * idx
    x_train += (np.random.rand(*x_train.shape) - 0.5) * 0.01
    y_train = np.asarray([idx] * N_SAMPLES)
    print("sub data: x {0} - y {1}".format(x_train.shape, y_train.shape))
    data.append(x_train)
    labels.extend([idx] * len(x_train))

    # Show example noisy training data that have the signatures applied.
    # It's not obvious to the human eye the subtle differences, but the cross
    # row and column above perturbed the below matrices with the y weights.
    # Show in the title how much each signature is weighted by.
    plt.figure(figsize=(16, 4))
    for idx in range(3):
        plt.subplot(1, 3, idx + 1)
        plt.imshow(np.squeeze(x_train[idx]), interpolation="None")
        plt.colorbar()
        plt.title(y_train[idx])

data = np.concatenate(data, axis=0)
labels = np.asarray(labels)
print("dataset: x {0} - y {1}".format(data.shape, labels.shape))


class UniformLabelSampler(Sampler):
    """ Samples elements uniformely accross pseudo labels.
    """
    def __init__(self, data_array):
        """ Init class.

        Parameters
        ----------
        data_array: ArrayDataset
            the train data array that contains the pseudo labels.
        """
        self.n_samples = len(data_array)
        self.data_array = data_array
        self.indexes = self.generate_indexes_epoch()

    def generate_indexes_epoch(self):
        """ Generate sampling indexes.
        """
        labels = self.data_array.labels
        clusters_to_images = self.get_clusters(labels)
        n_non_empty_clusters = len(clusters_to_images)
        size_per_pseudolabel = int(self.n_samples / n_non_empty_clusters) + 1
        res = np.array([])
        for name, cluster_indexes in clusters_to_images.items():
            indexes = np.random.choice(
                cluster_indexes, size_per_pseudolabel,
                replace=(len(cluster_indexes) <= size_per_pseudolabel))
            res = np.concatenate((res, indexes))
        np.random.shuffle(res)
        res = list(res.astype("int"))
        if len(res) >= self.n_samples:
            return res[:self.n_samples]
        res += res[: (self.n_samples - len(res))]
        return res

    def get_clusters(self, labels):
        """ Get indexes associated to each cluster.
        """
        tally = collections.defaultdict(list)
        for idx, item in enumerate(labels):
            tally[item].append(idx)
        return tally

    def __iter__(self):
        return iter(self.indexes)

    def __len__(self):
        return len(self.indexes)


# Create data manager
if UNIFORM_SAMPLING:
    sampler = UniformLabelSampler
else:
    sampler = "random"
manager = DataManager.from_numpy(
    train_inputs=data, train_labels=np.zeros(labels.shape),
    batch_size=BATCH_SIZE, sampler=sampler)


class FKmeans(object):
    def __init__(self, n_clusters):
        self.n_clusters = n_clusters

    def fit(self, data):
        n_data, d = data.shape
        self.clus = faiss.Kmeans(d, self.n_clusters)
        self.clus.seed = np.random.randint(1234)
        self.clus.niter = 20
        self.clus.max_points_per_centroid = 10000000
        self.clus.train(data)

    def predict(self, data):
        _, I = self.clus.index.search(data, 1)
        losses = self.clus.obj
        print("k-means loss evolution: {0}".format(losses))
        return np.asarray([int(n[0]) for n in I])


def my_loss(x, y):
    criterion = nn.CrossEntropyLoss()
    print("  x: {0} - {1}".format(x.shape, x.dtype))
    print(torch.argmax(x, dim=1))
    print("  y: {0} - {1}".format(y.shape, y.dtype))
    print(y)
    return criterion(x, y)


# Create model
train_loader = manager.get_dataloader(train=True, fold_index=0).train
if AVOID_EMPTY_CLUSTERS:
    kmeans = FKmeans(n_clusters=N_CLUSTERS)
else:
    kmeans = KMeans(
        n_clusters=N_CLUSTERS,
        random_state=None,
        # verbose=100,
        max_iter=20)
net = BrainNetCNN(
    input_shape=(90, 90),
    in_channels=1,
    num_classes=N_CLUSTERS,
    nb_e2e=32,
    nb_e2n=64,
    nb_n2g=30,
    dropout=0,
    leaky_alpha=0.1,
    twice_e2e=False,
    dense_sml=False)
net_params = pynet.NetParameters(
    network=net,
    clustering=kmeans,
    data_loader=train_loader,
    n_batchs=10,
    pca_dim=6,
    assignment_logfile=None,
    use_cuda=False)
model = DeepClusterClassifier(
    net_params,
    optimizer_name="SGD",
    learning_rate=0.001,
    momentum=0.9,
    weight_decay=10**-5,
    # loss=my_loss)
    loss_name="CrossEntropyLoss")
model.board = Board(port=8097, host="http://localhost", env="deepcluster")
model.add_observer("before_epoch", update_pseudo_labels)
model.add_observer("after_epoch", update_board)


# Train model
test_history, train_history = model.training(
    manager=manager,
    nb_epochs=N_EPOCHS,
    checkpointdir=None,
    fold_index=0,
    scheduler=None,
    with_validation=False)


# Test model
manager = DataManager.from_numpy(
    test_inputs=data, test_labels=labels, batch_size=BATCH_SIZE)
test_model = DeepLearningInterface(
    model=model.model.network,
    optimizer_name="SGD",
    learning_rate=0.01,
    momentum=0.9,
    weight_decay=10**-5,
    loss_name="CrossEntropyLoss")
y_pred, X, y_true, loss, values = test_model.testing(
    manager=manager,
    with_logit=True,
    # logit_function="sigmoid",
    predict=False)
print(y_pred.shape, X.shape, y_true.shape)


# Inspect results
result = pd.DataFrame.from_dict(collections.OrderedDict([
    ("pred", (np.argmax(y_pred, axis=1)).astype(int)),
    ("truth", y_true.squeeze()),
    ("prob_0", y_pred[:, 0].squeeze()),
    ("prob_1", y_pred[:, 1].squeeze())]))
print(result)
y_pred = np.argmax(y_pred, axis=1)
fig, ax = plt.subplots()
cmap = plt.get_cmap("Blues")
cm = SKMetrics("confusion_matrix", with_logit=False)(y_pred, y_true)
sns.heatmap(cm, cmap=cmap, annot=True, fmt="g", ax=ax)
ax.set_xlabel("predicted values")
ax.set_ylabel("actual values")
print(classification_report(y_true, y_pred >= 0.4))
fpr, tpr, _ = roc_curve(y_true, y_pred)
roc_auc = auc(fpr, tpr)
plt.figure()
plt.plot(fpr, tpr, color="darkorange", lw=2,
         label="ROC curve (area = %0.2f)" % roc_auc)
plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver operating characteristic")
plt.legend(loc="lower right")

plt.show()

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery

Follow us

© 2019, pynet developers .
Inspired by AZMIND template.