Advertisement

3D-resnet 50 医学3D图像二分类python代码

阅读量:
复制代码
    [Resnet3D预训练网络......]( "Resnet3D预训练网络......")[参看]( "参看")[腾讯的MedicalNet](https://github.com/Tencent/MedicalNet "腾讯的MedicalNet")[腾讯微云](https://share.weiyun.com/55sZyIx "腾讯微云")

文件夹格式如下

复制代码
 文件夹结构如下  
    
     |--data/:
    
     |   |--images/:# 原始图像
    
     |   |    |--1.nii.gz
    
     |   |    |--......
    
     |   |    |--n.nii.gz
    
     |	|--rois/: # ROI
    
     |   |    |--1.nii.gz
    
     |   |    |--......
    
     |   |    |--n.nii.gz
    
     |   |--test.txt # 外部验证数据列表
    
     |   |--train.txt# 训练数据列表
    
     |   |--val.txt  # 内部验证数据列表
    
     |--pretrain/:# 预训练模型
    
     |   |--resnet_50.pth
    
     |--trails/:  # 训练(finturn)后最好的模型
    
     |   |--best.pth
    
     |--train.py: 
    
  
    
  
    
 train.txt等格式如下,三列分别为原始图像(绝对路径),ROI(绝对路径),以及标签(数字),空格分隔
    
 E:\...\1.nrrd E:\...\1.nrrd 0
    
 E:\...\2.nrrd E:\...\2.nrrd 1
    
    
    
    

代码

复制代码
 import torch

    
 from torch import nn
    
 import torchio as tio
    
 import os
    
 import numpy as np
    
 from torch.utils.data import Dataset
    
 from scipy import ndimage
    
 from torch import optim
    
 from torch.utils.data import DataLoader
    
 import time
    
 import nibabel
    
 from sklearn.metrics import roc_curve, auc
    
 '''
    
 以下是可修改的参数部分
    
 '''
    
 no_cuda = True # 这里设置为True是使用cpu训练和验证,设置为False是使用gpu。action='store_true', help='If true, cuda is not used.
    
 train_img_list = r'E:\...\data\train.txt'  # type=str, help='Path of image list file for train'
    
 val_img_list = r'E:...\data\val.txt'  # type=str, help='Path of image list file for validation
    
 test_img_list = r'E:\...\data\test.txt'  # type=str, help='Path of image list file for test'
    
 pretrain_path = r'E:\...\pretrain\resnet_50.pth'  # type=str, help='Path for pretrained model.'
    
 save_folder = r'E:\...\trails'  # type=str, help='Path for saving model.'
    
 best_model_path = r'E:\...\trails\best.pth' # type=str, help='the best trained model for test.'
    
 total_epochs = 200  # type=int, help='Number of total epochs to run' #
    
 save_intervals = 10  # type=int, help='Interation for saving model'
    
 learning_rate = 0.001  # set to 0.001 when finetune, type=float, help= 'Initial learning rate (divided by 10 while training by lr scheduler)'
    
 new_layer_names = ['conv_cls'] # type=list, help='New layer except for backbone, used for transforlearn'
    
 batch_size = 10  # type=int, help='Batch Size'
    
 input_D = 56; input_H = 56; input_W = 56  # type=int, help='Input size of depth,height,width'
    
 torch.manual_seed(1)
    
 '''
    
 以下不可修改
    
 '''
    
 class Bottleneck(nn.Module): # 瓶颈模块
    
     expansion = 4
    
     def __init__(self, inplanes, planes, stride=1, dilation=1, downsample=None):
    
     super(Bottleneck, self).__init__()
    
     self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=1, bias=False)
    
     self.bn1 = nn.BatchNorm3d(planes)
    
     self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=stride, dilation=dilation, padding=dilation, bias=False)
    
     self.bn2 = nn.BatchNorm3d(planes)
    
     self.conv3 = nn.Conv3d(planes, planes * 4, kernel_size=1, bias=False)
    
     self.bn3 = nn.BatchNorm3d(planes * 4)
    
     self.relu = nn.ReLU(inplace=True)
    
     self.downsample = downsample
    
     self.stride = stride
    
     self.dilation = dilation
    
     def forward(self, x):
    
     residual = x
    
     out = self.conv1(x)
    
     out = self.bn1(out)
    
     out = self.relu(out)
    
     out = self.conv2(out)
    
     out = self.bn2(out)
    
     out = self.relu(out)
    
     out = self.conv3(out)
    
     out = self.bn3(out)
    
     if self.downsample is not None:
    
         residual = self.downsample(x)
    
     out += residual
    
     out = self.relu(out)
    
     return out #
    
 class ResNet(nn.Module):
    
     def __init__(self, block, layers, input_D, input_H, input_W):
    
     self.inplanes = 64
    
     super(ResNet, self).__init__()
    
     self.conv1 = nn.Conv3d(1, 64, kernel_size=7, stride=(2, 2, 2), padding=(3, 3, 3), bias=False)
    
     self.bn1 = nn.BatchNorm3d(64) # conv1的输出维度
    
     self.relu = nn.ReLU(inplace=True)
    
     self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1) # H/2,W/2。C不变
    
     self.layer1 = self._make_layer(block, 64, layers[0]) # H,W不变。downsample控制的shortcut,out_channel=64x4=256
    
     self.layer2 = self._make_layer(block, 128, layers[1], stride=2) # H/2, W/2。downsample控制的shortcut,out_channel=128x4=512
    
     self.layer3 = self._make_layer(block, 256, layers[2], stride=1, dilation=2) # H/2, W/2。downsample控制的shortcut,out_channel=256x4=1024
    
     self.layer4 = self._make_layer(block, 512, layers[3], stride=1, dilation=4) # H/2, W/2。downsample控制的shortcut,out_channel=512x4=2048
    
     # 最后的分类层--------------------------------------------------------------------------------------
    
     self.conv_cls = nn.Sequential(nn.AdaptiveAvgPool3d((1, 1, 1)), nn.Flatten(),nn.Dropout(0.1),
    
                              nn.Linear(in_features=512 * block.expansion, out_features=2, bias=True))
    
     for m in self.modules():
    
         if isinstance(m, nn.Conv3d):
    
             m.weight = nn.init.kaiming_normal_(m.weight, mode='fan_out')
    
         elif isinstance(m, nn.BatchNorm3d):
    
             m.weight.data.fill_(1)
    
             m.bias.data.zero_()
    
     def _make_layer(self, block, planes, blocks, stride=1, dilation=1):
    
     downsample = None
    
     if stride != 1 or self.inplanes != planes * block.expansion:
    
        downsample = nn.Sequential(
    
         nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
    
         nn.BatchNorm3d(planes * block.expansion))
    
     layers = []
    
     layers.append(block(self.inplanes, planes, stride=stride, dilation=dilation, downsample=downsample))
    
     self.inplanes = planes * block.expansion # 在下一次调用_make_layer函数的时候,self.in_channel已经x4
    
     for i in range(1, blocks):
    
         layers.append(block(self.inplanes, planes, dilation=dilation))
    
     return nn.Sequential(*layers) # '*'的作用是将list转换为非关键字参数传入
    
     def forward(self, x):
    
     x = self.conv1(x)
    
     x = self.bn1(x)
    
     x = self.relu(x)
    
     x = self.maxpool(x)
    
     x = self.layer1(x)
    
     x = self.layer2(x)
    
     x = self.layer3(x)
    
     x = self.layer4(x)
    
     x = self.conv_cls(x)
    
     return x
    
 def generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train):
    
     model = ResNet(Bottleneck, [3, 4, 6, 3], input_W=input_W, input_H=input_H, input_D=input_D)
    
     if not no_cuda:
    
     model = model.cuda()
    
     net_dict = model.state_dict()
    
     if train:
    
     print('loading pretrained model {}'.format(pretrain_path))
    
     if not no_cuda:
    
         model_weights = torch.load(pretrain_path, weights_only=True)
    
     model_weights = torch.load(pretrain_path, map_location=torch.device('cpu'), weights_only=True)
    
     else:
    
     print('loading best model {}'.format(best_model_path))
    
     if not no_cuda:
    
         model_weights = torch.load(best_model_path, weights_only=True)
    
     model_weights = torch.load(best_model_path, map_location=torch.device('cpu'), weights_only=True)
    
     model_dict = {k.replace("module.", ""): v for k, v in model_weights['state_dict'].items() if k.replace("module.", "") in net_dict.keys()}
    
     net_dict.update(model_dict) # 字典 dict2 的键/值对更新到 dict 里。
    
     model.load_state_dict(net_dict) # model.load_state_dict()函数把加载的权重复制到模型的权重中去
    
     new_parameters = [] #---这里设置base_parameters(低学习率用于finetune)和new_parameters(高学习率用于训练最后的分类层)使用不同的学习率
    
     for pname, p in model.named_parameters():
    
     for layer_name in new_layer_names:
    
         if pname.find(layer_name) >= 0:
    
             new_parameters.append(p)
    
             break
    
     new_parameters_id = list(map(id, new_parameters))
    
     base_parameters = list(filter(lambda p: id(p) not in new_parameters_id, model.parameters()))
    
     parameters = {'base_parameters': base_parameters, 'new_parameters': new_parameters}
    
     return model, parameters
    
  
    
 model, parameters = generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train = True)
    
 params = [{'params': parameters['base_parameters'], 'lr': learning_rate },{'params': parameters['new_parameters'], 'lr': learning_rate*100}]
    
 optimizer = torch.optim.SGD(params, momentum=0.9, weight_decay=1e-3) #--使用SGD优化学习率,也可以使用下面的Adam
    
 # optimizer = torch.optim.Adam(params, weight_decay=1e-3)
    
 scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)
    
 # 定义 3D 数据增强的变换操作
    
 transform = tio.Compose([
    
     # 随机旋转图像,旋转角度范围为 -15 到 15 度,保持纵横比一致 以50%的概率进行旋转
    
     tio.RandomAffine(degrees=15,isotropic=True, p=0.5),
    
     # 随机裁剪(在指定区域内裁剪图像)# 裁剪到128x128x128的大小# 以50%的概率裁剪
    
     # tio.RandomCrop(subject_size=(128, 128, 128), p=0.5),
    
     # 随机改变亮度,调整亮度的范围# 强度范围# 以20%的概率进行亮度调整
    
     tio.RandomBiasField(coefficients=0.5, p=0.2 ),
    
     # 随机进行仿射变换,变换范围为一个小区间# X, Y, Z方向的平移范围 # 缩放比例
    
     tio.RandomAffine( translation=(5, 5, 5), scales=(0.8, 1.2), p=0.5),
    
     # 随机噪声,添加噪声# 噪声标准差
    
     tio.RandomNoise(mean=0, std=0.05, p=0.2),
    
     # 随机镜像翻转# 如果有多个键值,则对对应的键进行翻转# 在左右(LR), 前后(AP), 上下(SI)三个方向上随机翻转
    
     tio.RandomFlip( keys=["image"], axes=("LR", "AP", "SI"),p=0.5),
    
     # 随机缩放
    
     tio.RandomElasticDeformation(num_control_points=5, max_displacement=3, locked_borders=2, p=0.2)
    
 ])
    
 class Dataset(Dataset):
    
     def __init__(self, img_list, input_D, input_H, input_W, train):
    
     with open(img_list, 'r') as f:
    
         self.img_list = [line.strip() for line in f]
    
     print("Processing {} datas".format(len(self.img_list)))
    
     self.input_D = input_D
    
     self.input_H = input_H
    
     self.input_W = input_W
    
     def __nii2tensorarray__(self, data):
    
     [z, y, x] = data.shape
    
     new_data = np.reshape(data, [1, z, y, x])
    
     new_data = new_data.astype("float32")
    
     return new_data
    
     def __len__(self):
    
     return len(self.img_list)
    
     def __getitem__(self, idx):
    
     ith_info = self.img_list[idx].split(" ")
    
     img_name = ith_info[0]
    
     mask_name = ith_info[1]
    
     label_name = ith_info[2]
    
     # print(img_name, mask_name, label_name)
    
     img = nibabel.load(img_name)
    
     assert img is not None
    
     mask = nibabel.load(mask_name)
    
     assert mask is not None
    
     assert img.get_fdata().shape == mask.get_fdata().shape, "img shape:{} is not equal to mask shape:{}".format(
    
            img.get_fdata().shape, mask.get_fdata().shape) # 判断 img和mask是否大小一致
    
     label = label_name
    
     assert label is not None
    
     img_array = self.__data_process__(img, mask)
    
     img_array = self.__nii2tensorarray__(img_array) # img to tensor array
    
     if train: # 训练集需要进行数据增强
    
         img_array = transform(img_array)
    
     label_array = torch.tensor([float(label)], dtype=torch.float32) # label to tensor array
    
     return img_array, label_array
    
     def __drop_invalid_range__(self, volume, mask):
    
     """
    
     Cut off the invalid area,then get volume from mask
    
     """
    
     masked_volume = np.zeros_like(volume)
    
     masked_volume[mask > 0] = volume[mask > 0]  # 将掩膜内部区域的图像数据复制到新数组中
    
     non_zeros_idx = np.where(volume == masked_volume)
    
     [max_z, max_h, max_w] = np.max(np.array(non_zeros_idx), axis=1)
    
     [min_z, min_h, min_w] = np.min(np.array(non_zeros_idx), axis=1)
    
     volume_dropped = masked_volume[min_z:max_z, min_h:max_h, min_w:max_w]
    
     return volume_dropped
    
     def __itensity_normalize_one_volume__(self, volume):
    
     """
    
     normalize the itensity of an nd volume based on the mean and std of nonzeor region
    
     """
    
     pixels = volume[volume > 0]
    
     mean = pixels.mean()
    
     std = pixels.std()
    
     out = (volume - mean) / std
    
     out_random = np.random.normal(0, 1, size=volume.shape)
    
     out[volume == 0] = out_random[volume == 0]
    
     return out
    
     def __resize_data__(self, data):
    
     """
    
     Resize the data to the input size
    
     """
    
     [depth, height, width] = data.shape
    
     scale = [self.input_D * 1.0 / depth, self.input_H * 1.0 / height, self.input_W * 1.0 / width]
    
     data = ndimage.zoom(data, scale, order=0)
    
     return data
    
     def __data_process__(self, data, mask):  # ------------------
    
     data = data.get_fdata()
    
     mask = mask.get_fdata()
    
     data = self.__drop_invalid_range__(data, mask)
    
     data = self.__resize_data__(data)
    
     data = self.__itensity_normalize_one_volume__(data)
    
     return data
    
 def train(train_data_loader, model, optimizer, scheduler, total_epochs, save_interval, save_folder):
    
     # settings
    
     batches_per_epoch = len(train_data_loader)
    
     # log.info()
    
     print('{} epochs in total, {} batches per epoch'.format(total_epochs, batches_per_epoch))
    
     loss_seg = nn.BCEWithLogitsLoss() # ----分类任务使用
    
     if not no_cuda:
    
     loss_seg = loss_seg.cuda()
    
     train_time_sp = time.time()
    
     for epoch in range(total_epochs):
    
     # log.info('Start epoch {}'.format(epoch))
    
     print('Start epoch {}'.format(epoch))
    
     model.train()
    
     num_correct = 0
    
     val_num_correct = 0
    
     for batch_id, batch_data in enumerate(train_data_loader):
    
         # getting data batch
    
         batch_id_sp = epoch * batches_per_epoch
    
         volumes, labels = batch_data
    
         if not no_cuda:
    
             volumes = volumes.cuda()
    
             labels = labels.cuda()
    
         # Backward and optimize
    
         optimizer.zero_grad()
    
         out_labels = model(volumes)
    
         loss_value_seg = loss_seg(out_labels, labels)
    
         loss = loss_value_seg
    
         loss.backward()
    
         optimizer.step()
    
  
    
         pred_labels = out_labels.argmax(dim=1)
    
         num_correct += torch.eq(pred_labels, labels).sum().float().item()
    
  
    
         avg_batch_time = (time.time() - train_time_sp) / (1 + batch_id_sp)
    
         print('Train Epoch: {}-{} ({}), loss = {:.3f}, loss_seg = {:.3f}, avg_batch_time = {:.3f}' \
    
                 .format(epoch, batch_id, batch_id_sp, loss.item(), loss_value_seg.item(), avg_batch_time))
    
         print('acc: {}' \
    
                  .format(num_correct / len(train_data_loader.dataset)))
    
          # save model
    
         if batch_id == 0 and batch_id_sp != 0 and batch_id_sp % save_interval == 0:
    
             model_save_path = '{}_epoch_{}_batch_{}.pth.tar'.format(save_folder, epoch, batch_id)
    
             model_save_dir = os.path.dirname(model_save_path)
    
             if not os.path.exists(model_save_dir):
    
                 os.makedirs(model_save_dir)
    
             print('Save checkpoints: epoch = {}, batch_id = {}'.format(epoch, batch_id))
    
             torch.save({'ecpoch': epoch,'batch_id': batch_id,'state_dict': model.state_dict(),
    
                         'optimizer': optimizer.state_dict()},model_save_path)
    
  
    
     model.eval()
    
     for batch_id, batch_data in enumerate(val_data_loader):
    
         volumes, labels = batch_data
    
         if not no_cuda:
    
             volumes = volumes.cuda()
    
             labels = labels.cuda()
    
         with torch.no_grad():
    
             out_labels = model(volumes)
    
             pred_labels = out_labels.argmax(dim=1)
    
         val_num_correct += torch.eq(pred_labels, labels).sum().float().item()
    
         print('acc: {}' \
    
                  .format(val_num_correct / len(val_data_loader.dataset)))
    
  
    
     scheduler.step()
    
     print('lr = {}'.format(scheduler.get_last_lr()))
    
     print('Finished training')
    
 def test(test_data_loader, model):
    
     out_labels = []
    
     labels = []
    
     model.eval() # for testing
    
     for batch_id, batch_data in enumerate(test_data_loader):
    
     volumes, label = batch_data
    
     if not no_cuda:
    
         volumes = volumes.cuda()
    
         label = label.cuda()
    
     with torch.no_grad():
    
         out_label = model(volumes)
    
     out_labels.append(out_label.cpu().item())
    
     labels.append(label.cpu().item())
    
     return labels, out_labels
    
  
    
 training_dataset = Dataset(train_img_list, input_D, input_H, input_W, train=True)
    
 val_dataset = Dataset(val_img_list, input_D, input_H, input_W, train=False)
    
 test_dataset = Dataset(test_img_list, input_D, input_H, input_W, train=False)
    
 train_data_loader = DataLoader(training_dataset, batch_size, shuffle=True, pin_memory=True)
    
 val_data_loader = DataLoader(val_dataset, batch_size, shuffle=True, pin_memory=True)
    
 test_data_loader = DataLoader(test_dataset, batch_size, shuffle=True, pin_memory=True)
    
  
    
 #------------start train-------------------------
    
 train(train_data_loader, model, optimizer, scheduler, total_epochs, save_interval=save_intervals, save_folder=save_folder)
    
 #------------start test-------------------------
    
 best_model, parameters = generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train = False)
    
 labels, out_labels = test(test_data_loader, best_model)
    
 fpr, tpr, thresholds = roc_curve(labels, out_labels)
    
 roc_auc = auc(fpr, tpr)
    
 print(roc_auc)
    
    
    
    

会出现提示:

复制代码
 UserWarning: Using TorchIO images without a torchio.SubjectsLoader in PyTorch >= 2.3 might have unexpected consequences, e.g., the collated batches will be instances of torchio.Subject with 5D images. Replace your PyTorch DataLoader with a torchio.SubjectsLoader so that the collated batch becomes a dictionary, as expected. See https://github.com/fepegar/torchio/issues/1179 for more context about this issue.

    
   warnings.warn(message, stacklevel=1)
    
    
    
    

是由于使用torchio对单独数据进行增强所导致,不影响使用

相关代码位于preProcess.py,第55-56行

复制代码
     if self.phase == 'train':  # 训练集需要进行数据增强

    
         img_array = transform(img_array)
    
    
    
    

后期有需要可以应用torchio对数据集进行增强

附加一个小工具,把imgs和rois分别从文件夹里读取出来,写到txt文件中

这里假设imgs和rois中的文件名是一一对应的

复制代码
 import os

    
  
    
 def write_file_pairs_to_text(images_folder, roi_folder, output_file):
    
     images_files = {f for f in os.listdir(images_folder) if os.path.isfile(os.path.join(images_folder, f))}
    
     rois_files = {f for f in os.listdir(roi_folder) if os.path.isfile(os.path.join(rois_folder, f))}
    
     print(images_files, rois_files)
    
     unmatched_images = images_files - rois_files
    
     unmatched_rois = rois_files - images_files
    
     if unmatched_images or unmatched_rois:
    
     raise ValueError(
    
         f"Unmatched files found:\nImages not in ROI: {unmatched_images}\nROIs not in Images (or with different names/extensions): {unmatched_rois}")
    
     # 由于接按文件名(包括扩展名)匹配,所以可以直接使用images_files(或roi_files,因为它们是相同的)
    
     with open(output_file, 'w') as f:
    
     for img_file in images_files:
    
         img_path = os.path.join(images_folder, img_file)
    
         # roi_files中也存在相同的文件名(包括扩展名),所以直接写入img_file两次
    
         f.write(f"{img_path}\t{img_path}\n")
    
  
    
 # 使用示例
    
 images_folder = r"E:\...\data\images" # 
    
 rois_folder = r"E:\...\data\rois"
    
 output_file = 'file_pairs.txt'
    
  
    
 write_file_pairs_to_text(images_folder, rois_folder, output_file)
    
    
    
    

全部评论 (0)

还没有任何评论哟~