Helper Module for Deep Learning.
Source code for pynet.datasets.core
# -*- coding: utf-8 -*-
##########################################################################
# NSAp - Copyright (C) CEA, 2019 - 2020
# Distributed under the terms of the CeCILL-B license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
# for details.
##########################################################################
"""
Module that provides core functions to load and split a dataset.
"""
# Imports
from collections import namedtuple, OrderedDict, Counter
import progressbar
import inspect
import random
import logging
import numpy as np
import pandas as pd
import torch
from torch.utils.data import (
Dataset, DataLoader, WeightedRandomSampler, RandomSampler,
SequentialSampler, Sampler)
from sklearn.model_selection import (
KFold, StratifiedKFold, ShuffleSplit, StratifiedShuffleSplit)
from skimage.util.shape import view_as_blocks
# Global parameters
SetItem = namedtuple("SetItem", ["test", "train", "validation"])
DataItem = namedtuple("DataItem", ["inputs", "outputs", "labels"])
logger = logging.getLogger("pynet")
[docs]class DataManager(object):
""" Data manager used to split a dataset in train, test and validation
pytorch datasets.
"""
[docs] def __init__(self, input_path, metadata_path, output_path=None,
labels=None, stratify_label=None, custom_stratification=None,
projection_labels=None, number_of_folds=10, batch_size=1,
sampler="random", input_transforms=None,
output_transforms=None, data_augmentation_transforms=None,
add_input=False, test_size=0.1, label_mapping=None,
patch_size=None, continuous_labels=False, sample_size=1,
**dataloader_kwargs):
""" Splits an input numpy array using memory-mapping into three sets:
test, train and validation. This function can stratify the data.
The train/test indices are performed using a Stratified or not
ShuffleSplit.
TODO: In the case of custom stratification, enable the weighted random
sampler.
Parameters
----------
input_path: str
the path to the numpy array containing the input tensor data
that will be splited/loaded or the dataset itself.
metadata_path: str
the path to the metadata table in tsv format.
output_path: str, default None
the path to the numpy array containing the output tensor data
that will be splited/loaded.
labels: list of str, default None
in case of classification/regression, the name of the column(s)
in the metadata table to be predicted.
projection_labels: dict, default None
selects only the data that match the conditions. Use this
dictionary to filter the input data from the metadata table:
{<column_name>: <value>}.
stratify_label: str, default None
the name of the column in the metadata table containing the label
used during the stratification (mutuallty exclusive with
'custom_stratification').
custom_stratification: dict, default None
split the dataset into train/validation/test according to the
defined stratification strategy. The filtering is performed
as for the labels projection (mutuallty exclusive with
'stratify_label').
number_of_folds: int, default 10
the number of folds that will be used in the cross validation.
batch_size: int, default 1
the size of each mini-batch.
sampler: str or Sampler, default 'random'
whether we use a sequential, random or weighted random sampler
(to deal with imbalanced classes issue) during the generation of
the mini-batches: None, 'random', 'weighted_random' or a custom
Sampler class.
input_transforms, output_transforms: list of callable, default None
transforms a list of samples with pre-defined transformations.
data_augmentation_transforms: list of callable, default None
transforms the training dataset input with pre-defined
transformations on the fly during the training.
add_input: bool, default False
if true concatenate the input tensor to the output tensor.
test_size: float, default 0.1
should be between 0.0 and 1.0 and represent the proportion of the
dataset to include in the test split.
label_mapping: dict, default None
a mapping that can be used to convert labels to be predicted
(string to int conversion).
patch_size: tuple, default None
the size of the patches that will be extracted from the
input/output images.
continuous_labels: bool, default False
if set consider labels as continuous values; ie. floats otherwise
a discrete values, ie. integer.
sample_size: float, default 1
should be between 0.0 and 1.0 and represent the proportion of the
dataset used by the manger (random selection that can be usefull
during testing.
"""
# Checks
if stratify_label is not None and custom_stratification is not None:
raise ValueError("You specified two stratification strategies.")
if ((inspect.isclass(sampler) and not issubclass(sampler, Sampler)) and
sampler not in (None, "random", "weighted_random")):
raise ValueError("Unsupported sampler.")
if sampler == "weighted_random" and stratify_label is None:
raise ValueError(
"Impossible to use the weighted sampler without a "
"stratification label.")
# Class parameters
# We should only work with masked data but we want to preserve the
# memory mapping so we are getting the right index at the end
# (in __getitem__ of ArrayDataset)
self.batch_size = batch_size
self.number_of_folds = number_of_folds
self.data_loader_kwargs = dataloader_kwargs
self.sampler = sampler
self.continuous_labels = continuous_labels
self.multi_bloc = None
if isinstance(input_path, dict):
self.dataset = input_path
return
df = pd.read_csv(metadata_path, sep="\t")
logger.debug("Metadata:\n{0}".format(df))
mask = DataManager.get_mask(
df=df, projection_labels=projection_labels,
sample_size=sample_size)
mask_indices = DataManager.get_mask_indices(mask)
logger.debug("Projection labels: {0}".format(projection_labels))
logger.debug("Mask: {0}".format(mask))
logger.debug("Mask indices: {0}".format(mask_indices))
self.inputs = np.load(input_path, mmap_mode='r')
logger.debug("Inputs: {0}".format(self.inputs.shape))
self.outputs, self.labels = (None, None)
if output_path is not None:
self.outputs = np.load(output_path, mmap_mode='r')
logger.debug("Outputs: {0}".format(self.outputs.shape))
if labels is not None:
self.labels = df[labels].values.squeeze()
logger.debug("Labels: {0}".format(self.labels.shape))
assert len(self.labels) == len(self.inputs)
self.metadata = df
self.mask = mask
self.test_size = test_size
self.input_transforms = input_transforms or []
self.output_transforms = output_transforms or []
self.data_augmentation_transforms = data_augmentation_transforms or []
self.add_input = add_input
self.dataset = dict(
(key, []) for key in ("train", "test", "validation"))
# Split into train+validation/test: get only indices.
val_indices, train_indices, test_indices = (None, None, None)
(self.stratify_labels, self.stratify_categories,
self.sampler_weights) = (None, None, None)
if stratify_label is not None:
self.stratify_labels = df[stratify_label].values
self.stratify_categories = set(self.stratify_labels[mask])
self.sampler_weights = Counter(self.stratify_labels[mask])
if self.test_size == 0:
train_indices = mask_indices
test_indices = None
else:
dummy_mask_like = np.ones(np.sum(mask))
if custom_stratification is not None:
for key in ("train", "test"):
if key not in custom_stratification:
raise ValueError("Unformed custom straitification.")
train_mask = DataManager.get_mask(
df, custom_stratification["train"])
test_mask = DataManager.get_mask(
df, custom_stratification["test"])
train_mask &= mask
test_mask &= mask
train_indices = DataManager.get_mask_indices(train_mask)
test_indices = DataManager.get_mask_indices(test_mask)
if "validation" in custom_stratification:
val_mask = DataManager.get_mask(
df, custom_stratification["validation"])
val_mask &= mask
val_indices = DataManager.get_mask_indices(val_indices)
elif stratify_label is not None:
splitter = StratifiedShuffleSplit(
n_splits=1, random_state=0, test_size=self.test_size)
train_mask, test_mask = next(splitter.split(
dummy_mask_like, self.stratify_labels[mask]))
train_indices = mask_indices[train_mask]
test_indices = mask_indices[test_mask]
else:
if test_size == 1:
train_indices, test_indices = (None, mask_indices)
else:
splitter = ShuffleSplit(
n_splits=1, random_state=0, test_size=test_size)
train_indices, test_indices = next(splitter.split(
dummy_mask_like))
train_indices = mask_indices[train_indices]
test_indices = mask_indices[test_indices]
logger.debug("Train+Validation indices: {0}-{1}".format(
len(train_indices) if train_indices is not None else None,
train_indices))
logger.debug("Test indices: {0}-{1}".format(
len(test_indices) if test_indices is not None else None,
test_indices))
if test_indices is None:
self.dataset["test"] = None
else:
self.dataset["test"] = ArrayDataset(
self.inputs, test_indices, labels=self.labels,
outputs=self.outputs, add_input=self.add_input,
input_transforms=self.input_transforms,
output_transforms=self.output_transforms,
label_mapping=label_mapping,
patch_size=patch_size)
if train_indices is None:
return
# Split the training set into K folds (K-1 for training, 1 for
# validation, K times)
dummy_train_like = np.ones(len(train_indices))
if val_indices is not None:
self.generator = [(train_indices, val_indices)]
elif stratify_label is not None:
kfold_splitter = StratifiedKFold(
n_splits=self.number_of_folds)
self.generator = kfold_splitter.split(
dummy_train_like, self.stratify_labels[train_indices])
self.generator = [(train_indices[train], train_indices[val])
for (train, val) in self.generator]
else:
kfold_splitter = KFold(n_splits=self.number_of_folds)
self.generator = kfold_splitter.split(dummy_train_like)
self.generator = [(train_indices[train], train_indices[val])
for (train, val) in self.generator]
for fold_train_indices, fold_val_indices in self.generator:
logger.debug("Fold train indices: {0}".format(fold_train_indices))
logger.debug("Fold val indices: {0}".format(fold_val_indices))
assert len(set(fold_val_indices) & set(fold_train_indices)) == 0
assert (len(set(fold_val_indices)) + len(set(fold_train_indices))
== len(set(train_indices)))
train_dataset = ArrayDataset(
self.inputs, fold_train_indices, labels=self.labels,
outputs=self.outputs, add_input=self.add_input,
input_transforms=(self.input_transforms +
self.data_augmentation_transforms),
output_transforms=(self.output_transforms +
self.data_augmentation_transforms),
label_mapping=label_mapping,
patch_size=patch_size)
val_dataset = ArrayDataset(
self.inputs, fold_val_indices, labels=self.labels,
outputs=self.outputs, add_input=self.add_input,
input_transforms=self.input_transforms,
output_transforms=self.output_transforms,
label_mapping=label_mapping,
patch_size=patch_size)
self.dataset["train"].append(train_dataset)
self.dataset["validation"].append(val_dataset)
[docs] @classmethod
def from_numpy(cls, test_inputs=None, test_outputs=None, test_labels=None,
train_inputs=None, train_outputs=None, train_labels=None,
validation_inputs=None, validation_outputs=None,
validation_labels=None, batch_size=1, sampler="random",
input_transforms=None, output_transforms=None,
data_augmentation_transforms=None, add_input=False,
label_mapping=None, patch_size=None,
continuous_labels=False):
""" Create a data manger from numpy arrays.
Parameters
----------
*_inputs, *_outputs, *_labels: ndarrays
the training data.
batch_size: int, default 1
the size of each mini-batch.
sampler: str or Sampler, default 'random'
whether we use a sequential, random or weighted random sampler
(to deal with imbalanced classes issue) during the generation of
the mini-batches: None, 'random', 'weighted_random' or a custom
Sampler class.
input_transforms, output_transforms: list of callable, default None
transforms a list of samples with pre-defined transformations.
data_augmentation_transforms: list of callable, default None
transforms the training dataset input with pre-defined
transformations on the fly during the training.
add_input: bool, default False
if true concatenate the input tensor to the output tensor.
label_mapping: dict, default None
a mapping that can be used to convert labels to be predicted
(string to int conversion).
patch_size: tuple, default None
the size of the patches that will be extracted from the
input/output images.
continuous_labels: bool, default False
if set consider labels as continuous values; ie. floats otherwise
a discrete values, ie. integer.
Returns
-------
ins: DataManager
a data manager.
"""
dataset = dict((key, None) for key in ("train", "test", "validation"))
input_transforms = input_transforms or []
output_transforms = output_transforms or []
data_augmentation_transforms = data_augmentation_transforms or []
if test_inputs is not None:
test_dataset = ArrayDataset(
inputs=test_inputs, indices=range(len(test_inputs)),
labels=test_labels, outputs=test_outputs,
input_transforms=input_transforms,
output_transforms=output_transforms,
add_input=add_input,
label_mapping=label_mapping,
patch_size=patch_size)
dataset["test"] = test_dataset
if train_inputs is not None:
train_dataset = ArrayDataset(
inputs=train_inputs,
indices=range(len(train_inputs)),
labels=train_labels,
outputs=train_outputs,
input_transforms=(input_transforms +
data_augmentation_transforms),
output_transforms=(output_transforms +
data_augmentation_transforms),
add_input=add_input,
label_mapping=label_mapping,
patch_size=patch_size)
dataset["train"] = [train_dataset]
if validation_inputs is not None:
validation_dataset = ArrayDataset(
inputs=validation_inputs,
indices=range(len(validation_inputs)),
labels=validation_labels,
outputs=validation_outputs,
input_transforms=input_transforms,
output_transforms=output_transforms,
add_input=add_input,
label_mapping=label_mapping,
patch_size=patch_size)
dataset["validation"] = [validation_dataset]
return cls(input_path=dataset,
metadata_path=None,
sampler=sampler,
batch_size=batch_size,
number_of_folds=1,
continuous_labels=continuous_labels)
[docs] @classmethod
def from_dataset(cls, test_dataset=None, train_dataset=None,
validation_dataset=None, batch_size=1, sampler="random",
multi_bloc=False):
""" Create a data manger from torch datasets.
Parameters
----------
*_dataset: Dataset
the train/validation/test datasets.
batch_size: int, default 1
the size of each mini-batch.
sampler: str or Sampler, default 'random'
whether we use a sequential, random or weighted random sampler
(to deal with imbalanced classes issue) during the generation of
the mini-batches: None, 'random', 'weighted_random' or a custom
Sampler class.
multi_bloc: bool, default False
if sett expect multi bloc datasets that returns a list with
N bloc of data.
Returns
-------
ins: DataManager
a data manager.
"""
dataset = dict((key, None) for key in ("train", "test", "validation"))
input_transforms = []
output_transforms = []
data_augmentation_transforms = []
if test_dataset is not None:
dataset["test"] = test_dataset
if train_dataset is not None:
dataset["train"] = [train_dataset]
if validation_dataset is not None:
dataset["validation"] = [validation_dataset]
manager = cls(input_path=dataset,
metadata_path=None,
sampler=sampler,
batch_size=batch_size,
number_of_folds=1)
manager.multi_bloc = multi_bloc
return manager
def __getitem__(self, item):
""" Return the requested item.
Returns
-------
item: Dataset or list of Dataset
the requested set of data: test, train or validation.
"""
if item not in ("train", "test", "validation"):
raise ValueError(
"Unknown set! Must be 'train', 'test' or 'validation'.")
return self.dataset[item]
[docs] def collate_fn(self, list_samples):
""" After fetching a list of samples using the indices from sampler,
the function passed as the collate_fn argument is used to collate lists
of samples into batches.
A custom collate_fn is used here to apply the transformations.
See https://pytorch.org/docs/stable/data.html#dataloader-collate-fn.
"""
data = OrderedDict()
for key in ("inputs", "outputs", "labels"):
if (len(list_samples) == 0 or
getattr(list_samples[-1], key) is None):
data[key] = None
elif self.multi_bloc:
n_blocs = len(getattr(list_samples[-1], key))
data[key] = [torch.stack([
torch.as_tensor(getattr(sample, key)[bloc])
for sample in list_samples], dim=0).float()
for bloc in range(n_blocs)]
else:
data[key] = torch.stack([
torch.as_tensor(getattr(sample, key))
for sample in list_samples], dim=0).float()
if data["labels"] is not None:
if self.continuous_labels:
data["labels"] = data["labels"].type(torch.FloatTensor)
else:
data["labels"] = data["labels"].type(torch.LongTensor)
return DataItem(**data)
[docs] def get_dataloader(self, train=False, validation=False, test=False,
fold_index=0):
""" Generate a pytorch DataLoader.
Parameters
----------
train: bool, default False
return the dataloader over the train set.
validation: bool, default False
return the dataloader over the validation set.
test: bool, default False
return the dataloader over the test set.
fold_index: int, default 0
the index of the fold to use for the training
Returns
-------
loaders: list of DataLoader
the requested data loaders.
"""
_test, _train, _validation, sampler = (None, None, None, None)
if test:
_test = DataLoader(
self.dataset["test"], batch_size=self.batch_size,
collate_fn=self.collate_fn, **self.data_loader_kwargs)
if train:
# weights is a list of weights per data point in the data set we
# are drawing from, NOT a weight per class.
if inspect.isclass(self.sampler):
sampler = self.sampler(self.dataset["train"][fold_index])
elif self.sampler == "weighted_random":
if self.sampler_weights is None:
raise ValueError(
"Weighted random not yet supported with your input "
"parameters.")
indices = self.dataset["train"][fold_index].indices
samples_weigths = [self.sampler_weights[
self.stratify_labels[idx]] for idx in indices]
sampler = WeightedRandomSampler(
samples_weigths, len(indices), replacement=True)
elif self.sampler == "random":
sampler = RandomSampler(
self.dataset["train"][fold_index], replacement=False)
_train = DataLoader(
self.dataset["train"][fold_index], batch_size=self.batch_size,
sampler=sampler, collate_fn=self.collate_fn,
**self.data_loader_kwargs)
if validation:
_validation = DataLoader(
self.dataset["validation"][fold_index],
batch_size=self.batch_size, collate_fn=self.collate_fn,
**self.data_loader_kwargs)
return SetItem(test=_test, train=_train, validation=_validation)
[docs] @staticmethod
def get_mask(df, projection_labels=None, sample_size=1):
""" Filter a table.
Parameters
----------
df: a pandas DataFrame
a table data.
projection_labels: dict, default None
selects only the data that match the conditions in the dict
{<column_name>: <value>}.
sample_size: float, default 1
should be between 0.0 and 1.0 and represent the proportion of the
dataset used by the manager (random selection that can be usefull
during testing).
Returns
-------
mask: a list of boolean values.
"""
mask = np.random.choice(2, len(df), p=[1 - sample_size, sample_size])
mask = mask.astype(np.bool)
if projection_labels is None:
return mask
for (col, val) in projection_labels.items():
if isinstance(val, list):
mask &= getattr(df, col).isin(val)
elif val is not None:
mask &= getattr(df, col).eq(val)
return mask
[docs] @staticmethod
def get_mask_indices(mask):
""" From an input mask vector, return the true indices.
"""
return np.arange(len(mask))[mask]
class ArrayDataset(Dataset):
""" A dataset based on numpy array.
"""
def __init__(self, inputs, indices, labels=None, outputs=None,
add_input=False, input_transforms=None,
output_transforms=None, label_mapping=None,
patch_size=None):
""" Initialize the class.
Parameters
----------
inputs: numpy array
the input data.
indices: iterable of int
the list of indices that is considered in this dataset.
outputs: numpy array
the output data.
add_input: bool, default False
if set concatenate the input data to the output (useful with
auto-encoder).
input_transforms, output_transforms: list of callable, default None
transforms a list of samples with pre-defined transformations.
label_mapping: dict, default None
a mapping that can be used to convert labels to be predicted
(string to int conversion).
patch_size: tuple, default None
the size of the patches that will be extracted from the
input/output images.
"""
# Checks
if labels is not None:
assert len(inputs) == len(labels)
if outputs is not None:
assert len(inputs) == len(outputs)
# Class parameters
self.inputs = inputs
self.labels = labels
self.outputs = outputs
self.indices = indices
self.add_input = add_input
self.input_transforms = input_transforms or []
self.output_transforms = output_transforms or []
self.label_mapping = label_mapping
self.patch_size = patch_size
self.input_size = np.asarray(self.inputs.shape[2:])
if self.patch_size is not None:
self.patch_size = np.asarray(self.patch_size)
logger.debug("Patch size: {0}".format(self.patch_size))
logger.debug("Input size: {0}".format(self.input_size))
assert self.patch_size.shape == self.input_size.shape
self.patch_grid = self.input_size // self.patch_size
logger.debug("Patch grid: {0}".format(self.patch_grid))
self.nb_patches_by_img = np.prod(self.patch_grid)
logger.debug("Number patches: {0}".format(self.nb_patches_by_img))
(self.input_cached, self.output_cached, self.label_cached,
self.image_idx_cached) = (None, None, None, None)
def __getitem__(self, item):
""" Return the requested item.
Returns
-------
item: namedtuple
a named tuple containing 'inputs', 'outputs', and 'labels' data.
"""
logger.debug("Asked item: {0}".format(item))
if isinstance(item, int):
concat_axis = 0
else:
concat_axis = 1
# If the patches are already loaded just select the requested patch
if self.patch_size is not None:
patch_idx = item % self.nb_patches_by_img
image_idx = item // self.nb_patches_by_img
indices = self.indices[image_idx]
if self.image_idx_cached == image_idx:
# Retrieve directly the input (and eventually the output)
idx = tuple(np.unravel_index(patch_idx, self.patch_grid))
logger.debug("Getting patch index item: {0}".format(idx))
_inputs = self.input_cached[idx]
if self.output_cached is not None:
_outputs = self.output_cached[idx]
else:
_outputs = None
_labels = self.label_cached
return DataItem(inputs=_inputs, outputs=_outputs,
labels=_labels)
else:
indices = self.indices[item]
# Load the requested data
logger.debug("Precomputed indices: {0}".format(indices))
_inputs = self.inputs[indices]
_labels, _outputs = (None, None)
if self.labels is not None:
_labels = self.labels[indices]
if self.outputs is not None:
_outputs = self.outputs[indices]
# Apply the transformations to the data
seed = random.getrandbits(30)
for tf in self.input_transforms:
if hasattr(tf, "seed"):
tf.seed = seed
if hasattr(tf, "dtype"):
tf.dtype = "input"
_inputs = tf(_inputs)
if _outputs is not None:
for tf in self.output_transforms:
if hasattr(tf, "seed"):
tf.seed = seed
if hasattr(tf, "dtype"):
tf.dtype = "output"
_outputs = tf(_outputs)
if _labels is not None and self.label_mapping is not None:
_labels = [label_mapping[item] for item in _labels]
# Cache data patches and select the requested patch
if self.patch_size is not None:
self.image_idx_cached = image_idx
idx = tuple(np.unravel_index(patch_idx, self.patch_grid))
logger.debug("Getting patch index item: {0}".format(idx))
logger.debug("Splitting input: {0}".format(_inputs.shape))
self.input_cached = ArrayDataset._create_patches(
_inputs, self.patch_size)
logger.debug("Cached: {0}".format(self.input_cached.shape))
if _outputs is not None:
logger.debug("Splitting output: {0}".format(_outputs.shape))
self.output_cached = ArrayDataset._create_patches(
_outputs, self.patch_size)
logger.debug("Cached: {0}".format(self.output_cached.shape))
self.label_cached = _labels
_inputs = self.input_cached[idx]
_outputs = self.output_cached[idx]
# Add input
if self.add_input:
if _outputs is None:
_outputs = _inputs
else:
_outputs = np.concatenate(
(_outputs, _inputs), axis=concat_axis)
return DataItem(inputs=_inputs, outputs=_outputs, labels=_labels)
@staticmethod
def _create_patches(arr, patch_size):
channel_idx = len(patch_size)
channels_cached = []
for channel in arr:
channel_patches = view_as_blocks(channel, tuple(patch_size))
channel_patches = np.expand_dims(channel_patches, axis=channel_idx)
channels_cached.append(channel_patches)
return np.concatenate(channels_cached, axis=channel_idx)
def __len__(self):
""" Return the length of the dataset.
"""
if self.patch_size is not None:
return len(self.indices) * self.nb_patches_by_img
return len(self.indices)
Follow us
© 2019, pynet developers .
Inspired by AZMIND template.
Inspired by AZMIND template.