Menu

Helper Module for Deep Learning.

Source code for pynet.core

# -*- coding: utf-8 -*-
##########################################################################
# NSAp - Copyright (C) CEA, 2019
# Distributed under the terms of the CeCILL-B license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
# for details.
##########################################################################

"""
Core classes.
"""

# System import
import re
import os
import copy
import types
import warnings
import logging
from collections import OrderedDict

# Third party import
import torch
import torch.nn.functional as func
import progressbar
import numpy as np

# Package import
from pynet.utils import checkpoint
from pynet.history import History
from pynet.observable import Observable
from pynet.utils import Metrics


# Global parameters
logger = logging.getLogger("pynet")


[docs]class Base(Observable): """ Base class for perform Deep Learning training. """
[docs] def __init__(self, optimizer_name="Adam", learning_rate=1e-3, loss_name="NLLLoss", metrics=None, use_cuda=False, pretrained=None, resume=False, add_labels=False, **kwargs): """ Class instantiation. Observers will be notified, allowed signals are: - 'before_epoch' - 'after_epoch' - 'kernel_regularizer' Parameters ---------- optimizer_name: str, default 'Adam' the name of the optimizer: see 'torch.optim' for a description of available optimizer. learning_rate: float, default 1e-3 the optimizer learning rate. loss_name: str, default 'NLLLoss' the name of the loss: see 'torch.nn' for a description of available loss. metrics: list of str a list of extra metrics that will be computed. use_cuda: bool, default False wether to use GPU or CPU. pretrained: path, default None path to the pretrained model or weights. resume: bool, default False if set to true, the code will restore the weights of the model but also restore the optimizer's state, as well as the hyperparameters used, and the scheduler. add_labels: bool, default False if set and labels are specified in the data manager, add the labels to the forward function parameters. kwargs: dict specify directly a custom 'model', 'optimizer' or 'loss'. Can also be used to set specific optimizer parameters. """ super().__init__( signals=["before_epoch", "after_epoch", "regularizer"]) self.optimizer = kwargs.get("optimizer") self.loss = kwargs.get("loss") self.resume = resume self.add_labels = add_labels for name in ("optimizer", "loss"): if name in kwargs: kwargs.pop(name) if "model" in kwargs: self.model = kwargs.pop("model") if self.optimizer is None: if optimizer_name not in dir(torch.optim): raise ValueError( "Optimizer '{0}' uknown: check available optimizer in " "'pytorch.optim'.".format(optimizer_name)) self.optimizer = getattr(torch.optim, optimizer_name)( self.model.parameters(), lr=learning_rate, **kwargs) if self.loss is None: if loss_name not in dir(torch.nn): raise ValueError("Loss '{0}' unknown: check available loss in " "'pytorch.nn'.".format(loss_name)) self.loss = getattr(torch.nn, loss_name)() self.metrics = {} for obj_or_name in (metrics or []): if isinstance(obj_or_name, types.FunctionType): self.metrics[obj_or_name.__name__] = obj_or_name continue if hasattr(obj_or_name, "__call__"): self.metrics[obj_or_name.__class__.__name__] = obj_or_name continue if obj_or_name not in Metrics.get_registry(): logger.info("Available metrics:\n{0}".format( Metrics.get_registry())) raise ValueError("Metric '{0}' not yet supported: you can try " "to fill the 'Metrics' factory, or ask for " "some help!".format(obj_or_name)) self.metrics[obj_or_name] = Metrics.get_registry()[obj_or_name] if use_cuda and not torch.cuda.is_available(): raise ValueError("No GPU found: unset 'use_cuda' parameter.") self.checkpoint = None if pretrained is not None: kwargs = {} if not use_cuda: kwargs["map_location"] = torch.device("cpu") self.checkpoint = torch.load(pretrained, **kwargs) if hasattr(self.checkpoint, "state_dict"): self.model.load_state_dict(self.checkpoint.state_dict()) elif isinstance(self.checkpoint, dict): if "model" in self.checkpoint: self.model.load_state_dict(self.checkpoint["model"]) if self.resume and "optimizer" in self.checkpoint: self.optimizer.load_state_dict( self.checkpoint["optimizer"]) else: self.model.load_state_dict(self.checkpoint) self.device = torch.device("cuda" if use_cuda else "cpu") self.model = self.model.to(self.device)
[docs] def training(self, manager, nb_epochs, checkpointdir=None, fold_index=None, scheduler=None, with_validation=True, save_after_epochs=1, add_labels=False): """ Train the model. Parameters ---------- manager: a pynet DataManager a manager containing the train and validation data. nb_epochs: int, default 100 the number of epochs. checkpointdir: str, default None a destination folder where intermediate models/histories will be saved. fold_index: int, default None the index of the fold to use for the training, default use all the available folds. scheduler: torch.optim.lr_scheduler, default None a scheduler used to reduce the learning rate. with_validation: bool, default True if set use the validation dataset. save_after_epochs: int, default 1 determines when the model is saved and represents the number of epochs before saving. Returns ------- train_history, valid_history: History the train/validation history. """ if self.resume and "scheduler" in self.checkpoint: scheduler.load_state_dict(self.checkpoint["scheduler"]) if checkpointdir is not None and not os.path.isdir(checkpointdir): os.mkdir(checkpointdir) train_history = History(name="train") if with_validation is not None: valid_history = History(name="validation") else: valid_history = None logger.info("Loss function {0}.".format(self.loss)) logger.info("Optimizer function {0}.".format(self.optimizer)) folds = range(manager.number_of_folds) if fold_index is not None: folds = [fold_index] init_optim_state = copy.deepcopy(self.optimizer.state_dict()) init_model_state = copy.deepcopy(self.model.state_dict()) if scheduler is not None: init_scheduler_state = copy.deepcopy(scheduler.state_dict()) for fold in folds: logger.debug("Running fold {0}...".format(fold)) self.optimizer.load_state_dict(init_optim_state) self.model.load_state_dict(init_model_state) if scheduler is not None: scheduler.load_state_dict(init_scheduler_state) loaders = manager.get_dataloader( train=True, validation=with_validation, fold_index=fold) for epoch in range(nb_epochs): logger.debug("Running epoch {0}:".format(fold)) logger.debug(" notify observers with signal 'before_epoch'.") self.notify_observers("before_epoch", epoch=epoch, fold=fold, scheduler=scheduler) observers_kwargs = {} logger.debug(" train.") loss, values = self.train(loaders.train) observers_kwargs["loss"] = loss observers_kwargs.update(values) if scheduler is not None: logger.debug(" update scheduler.") scheduler.step(loss) if hasattr(scheduler, "get_last_lr"): logger.debug(" - lr: {0}".format( scheduler.get_last_lr())) else: logger.debug(" - lr: {0}".format(scheduler._last_lr)) logger.debug(" update train history.") train_history.log((fold, epoch), loss=loss, **values) train_history.summary() if (checkpointdir is not None and epoch % save_after_epochs == 0): logger.debug(" create checkpoint.") checkpoint( model=self.model, epoch=epoch, fold=fold, outdir=checkpointdir, optimizer=self.optimizer, scheduler=scheduler) train_history.save( outdir=checkpointdir, epoch=epoch, fold=fold) if with_validation: logger.debug(" validation.") y_pred, loss, values = self.test( loaders.validation, is_validation=True) observers_kwargs["val_loss"] = loss observers_kwargs.update(dict( ("val_{0}".format(key), val) for key, val in values.items())) observers_kwargs["val_pred"] = y_pred logger.debug(" update validation history.") valid_history.log((fold, epoch), loss=loss, **values) valid_history.summary() if (checkpointdir is not None and epoch % save_after_epochs == 0): logger.debug(" create checkpoint.") valid_history.save( outdir=checkpointdir, epoch=epoch, fold=fold) logger.debug(" notify observers with signal 'after_epoch'.") self.notify_observers("after_epoch", epoch=epoch, fold=fold, scheduler=scheduler, **observers_kwargs) logger.debug("End epoch.".format(fold)) logger.debug("End fold.") return train_history, valid_history
[docs] def train(self, loader): """ Train the model on the trained data. Parameters ---------- loader: a pytorch Dataset the data laoder. Returns ------- loss: float the value of the loss function. values: dict the values of the metrics. """ logger.debug("Update model for training.") self.model.train() nb_batch = len(loader) values = {} loss = 0 pbar = progressbar.ProgressBar( max_value=nb_batch, redirect_stdout=True, prefix="Mini-batch ") pbar.start() for iteration, dataitem in enumerate(loader): logger.debug("Mini-batch {0}:".format(iteration)) pbar.update(iteration + 1) logger.debug(" transfer inputs to {0}.".format(self.device)) inputs = self._to_device(dataitem.inputs) logger.debug(" transfer targets to {0}.".format(self.device)) targets = [] for item in (dataitem.outputs, dataitem.labels): if item is not None: targets.append(self._to_device(item)) args = () if self.add_labels and dataitem.labels is not None: args = (targets[-1], ) logger.debug(" evaluate model.") self.optimizer.zero_grad() output_items = self.model(inputs, *args) outputs, layer_outputs = self._parse_outputs(output_items) logger.debug(" update loss.") if isinstance(outputs, list): logger.debug(" outputs: {0}".format(len(outputs))) elif isinstance(outputs, np.ndarray): logger.debug(" outputs: {0} - {1}".format( outputs.shape, outputs.dtype)) logger.debug(" targets: {0}".format(len(targets))) if hasattr(self.loss, "layer_outputs"): self.loss.layer_outputs = layer_outputs batch_items = self.loss(outputs, *targets) batch_loss, batch_loss_metrics = self._parse_outputs(batch_items) if batch_loss_metrics is not None: for name, metric in batch_loss_metrics.items(): if name not in values: values[name] = 0 values[name] += float(metric) / nb_batch regularizations = self.notify_observers( "regularizer", layer_outputs=layer_outputs) for reg in regularizations: batch_loss += reg logger.debug(" update model weights.") batch_loss.backward() self.optimizer.step() loss += batch_loss.item() / nb_batch for name, metric in self.metrics.items(): logger.debug(" compute metric '{0}'.".format(name)) if hasattr(metric, "layer_outputs"): metric.layer_outputs = layer_outputs if name not in values: values[name] = 0 values[name] += float(metric(outputs, *targets)) / nb_batch logger.debug("Mini-batch done.") pbar.finish() logger.debug("Loss {0} ({1})".format(loss, type(loss))) return loss, values
[docs] def testing(self, manager, with_logit=False, logit_function="softmax", predict=False, concat_layer_outputs=None): """ Evaluate the model. Parameters ---------- manager: a pynet DataManager a manager containing the test data. with_logit: bool, default False apply the logit function to the result. logit_function: str, default 'softmax' choose the logit function. predict: bool, default False take the argmax over the channels. concat_layer_outputs: list of str, default None the outputs of the intermediate layers to be merged with the predicted data (must be the same size). Returns ------- y: array-like the predicted data. X: array-like the input data. y_true: array-like the true data if available. loss: float the value of the loss function if true data availble. values: dict the values of the metrics if true data availble. """ loaders = manager.get_dataloader(test=True) y, loss, values = self.test( loaders.test, with_logit=with_logit, logit_function=logit_function, predict=predict, concat_layer_outputs=concat_layer_outputs) if loss == 0: loss, values, y_true, X = (None, None, None, None) else: y_true = [] X = [] targets = OrderedDict() for dataitem in loaders.test: for cnt, item in enumerate((dataitem.outputs, dataitem.labels)): if item is not None: targets.setdefault(cnt, []).append( item.cpu().detach().numpy()) X.append(dataitem.inputs.cpu().detach().numpy()) X = np.concatenate(X, axis=0) for key, _values in targets.items(): y_true.append(np.concatenate(_values, axis=0)) if len(y_true) == 1: y_true = y_true[0] return y, X, y_true, loss, values
[docs] def test(self, loader, with_logit=False, logit_function="softmax", predict=False, concat_layer_outputs=None, is_validation=False): """ Evaluate the model on the test or validation data. Parameters ---------- loader: a pytorch Dataset the data laoder. with_logit: bool, default False apply the logit function to the result. logit_funtction: str, default 'softmax' choose the logit function. predict: bool, default False take the argmax over the channels. concat_layer_outputs: list of str, default None the outputs of the intermediate layers to be merged with the predicted data (must be the same size). is_validation: bool default False specify if we are in the validation phase. Returns ------- y: array-like the predicted data. loss: float the value of the loss function. values: dict the values of the metrics. """ logger.debug("Update model for testing.") self.model.eval() nb_batch = len(loader) loss = 0 values = {} concate_out = True with torch.no_grad(): y = [] pbar = progressbar.ProgressBar( max_value=nb_batch, redirect_stdout=True, prefix="Mini-batch ") pbar.start() for iteration, dataitem in enumerate(loader): logger.debug("Mini-batch {0}:".format(iteration)) pbar.update(iteration + 1) logger.debug(" transfer inputs to {0}.".format(self.device)) inputs = self._to_device(dataitem.inputs) logger.debug(" transfer targets to {0}.".format(self.device)) targets = [] for item in (dataitem.outputs, dataitem.labels): if item is not None: targets.append(self._to_device(item)) args = () if self.add_labels and dataitem.labels is not None: args = (targets[-1], ) logger.debug(" evaluate model.") output_items = self.model(inputs, *args) extra_outputs = [] outputs, layer_outputs = self._parse_outputs(output_items) if (concat_layer_outputs is not None and layer_outputs is not None): for name in concat_layer_outputs: if name not in layer_outputs: raise ValueError( "Unknown layer output '{0}'. Check the " "network forward method.".format(name)) extra_outputs.append(layer_outputs[name]) if is_validation or len(targets) > 0: logger.debug(" update loss.") logger.debug(" layer outputs: {0}".format(layer_outputs)) if hasattr(self.loss, "layer_outputs"): self.loss.layer_outputs = layer_outputs batch_items = self.loss(outputs, *targets) batch_loss, batch_loss_metrics = self._parse_outputs( batch_items) if batch_loss_metrics is not None: for name, metric in batch_loss_metrics.items(): if name not in values: values[name] = 0 values[name] += float(metric) / nb_batch loss += float(batch_loss) / nb_batch for name, metric in self.metrics.items(): logger.debug(" compute metric '{0}'.".format(name)) if hasattr(metric, "layer_outputs"): metric.layer_outputs = layer_outputs if name not in values: values[name] = 0 values[name] += metric(outputs, *targets) / nb_batch if len(extra_outputs) > 0: y.append(torch.cat([outputs] + extra_outputs, 1)) else: if isinstance(outputs, list): concate_out = False y.append(outputs) logger.debug("Mini-batch done.") pbar.finish() if concate_out: try: y = torch.cat(y, 0) except: warnings.warn("Impossible to concatenate y!") if with_logit: logger.debug("Apply logit.") if logit_function == "softmax": y = func.softmax(y, dim=1) elif logit_function == "sigmoid": y = torch.sigmoid(y) else: raise ValueError("Unsupported logit function.") y = self._numpy(y) if predict: logger.debug("Apply predict.") y = np.argmax(y, axis=1) return y, loss, values
def _numpy(self, data): """ Transfer data to cpu as array data. Parameters ---------- data: tensor or list of tensor the data to transfer. Returns ------- out: array or list of array the transfered data. """ if isinstance(data, list): return [self._numpy(tensor) for tensor in data] elif torch.is_tensor(data): return data.cpu().detach().numpy() else: return data def _parse_outputs(self, output_items): """ Parse function/method outputs. Parameters ---------- output_items: object, list or tuple the data to be parsed. Returns ------- out: object the first output. extra_out: dict other outputs. """ if (not isinstance(output_items, tuple) and not isinstance(output_items, list)): out = output_items extra_out = None elif len(output_items) == 1: out = output_items[0] extra_out = None elif len(output_items) == 2: out, extra_out = output_items else: raise ValueError( "The function / method can only return one or " "two parameters: the main output (loss, forward result), " "and as an option specific outputs in a dictionary.") if extra_out is not None and not isinstance(extra_out, dict): raise ValueError("Specific outputs must be a dictionary.") return out, extra_out def _to_device(self, data): """ Transfer data to device. Parameters ---------- data: tensor or list of tensor the data to transfer. Returns ------- out: tensor or list of tensor the transfered data. """ if isinstance(data, list): return [tensor.to(self.device) for tensor in data] else: return data.to(self.device)

Follow us

© 2019, pynet developers .
Inspired by AZMIND template.