3D-resnet 50 医学3D图像二分类python代码
发布时间
阅读量:
阅读量
[Resnet3D预训练网络......]( "Resnet3D预训练网络......")[参看]( "参看")[腾讯的MedicalNet](https://github.com/Tencent/MedicalNet "腾讯的MedicalNet")[腾讯微云](https://share.weiyun.com/55sZyIx "腾讯微云")
文件夹格式如下
文件夹结构如下
|--data/:
| |--images/:# 原始图像
| | |--1.nii.gz
| | |--......
| | |--n.nii.gz
| |--rois/: # ROI
| | |--1.nii.gz
| | |--......
| | |--n.nii.gz
| |--test.txt # 外部验证数据列表
| |--train.txt# 训练数据列表
| |--val.txt # 内部验证数据列表
|--pretrain/:# 预训练模型
| |--resnet_50.pth
|--trails/: # 训练(finturn)后最好的模型
| |--best.pth
|--train.py:
train.txt等格式如下,三列分别为原始图像(绝对路径),ROI(绝对路径),以及标签(数字),空格分隔
E:\...\1.nrrd E:\...\1.nrrd 0
E:\...\2.nrrd E:\...\2.nrrd 1
代码
import torch
from torch import nn
import torchio as tio
import os
import numpy as np
from torch.utils.data import Dataset
from scipy import ndimage
from torch import optim
from torch.utils.data import DataLoader
import time
import nibabel
from sklearn.metrics import roc_curve, auc
'''
以下是可修改的参数部分
'''
no_cuda = True # 这里设置为True是使用cpu训练和验证,设置为False是使用gpu。action='store_true', help='If true, cuda is not used.
train_img_list = r'E:\...\data\train.txt' # type=str, help='Path of image list file for train'
val_img_list = r'E:...\data\val.txt' # type=str, help='Path of image list file for validation
test_img_list = r'E:\...\data\test.txt' # type=str, help='Path of image list file for test'
pretrain_path = r'E:\...\pretrain\resnet_50.pth' # type=str, help='Path for pretrained model.'
save_folder = r'E:\...\trails' # type=str, help='Path for saving model.'
best_model_path = r'E:\...\trails\best.pth' # type=str, help='the best trained model for test.'
total_epochs = 200 # type=int, help='Number of total epochs to run' #
save_intervals = 10 # type=int, help='Interation for saving model'
learning_rate = 0.001 # set to 0.001 when finetune, type=float, help= 'Initial learning rate (divided by 10 while training by lr scheduler)'
new_layer_names = ['conv_cls'] # type=list, help='New layer except for backbone, used for transforlearn'
batch_size = 10 # type=int, help='Batch Size'
input_D = 56; input_H = 56; input_W = 56 # type=int, help='Input size of depth,height,width'
torch.manual_seed(1)
'''
以下不可修改
'''
class Bottleneck(nn.Module): # 瓶颈模块
expansion = 4
def __init__(self, inplanes, planes, stride=1, dilation=1, downsample=None):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm3d(planes)
self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=stride, dilation=dilation, padding=dilation, bias=False)
self.bn2 = nn.BatchNorm3d(planes)
self.conv3 = nn.Conv3d(planes, planes * 4, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm3d(planes * 4)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
self.dilation = dilation
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out #
class ResNet(nn.Module):
def __init__(self, block, layers, input_D, input_H, input_W):
self.inplanes = 64
super(ResNet, self).__init__()
self.conv1 = nn.Conv3d(1, 64, kernel_size=7, stride=(2, 2, 2), padding=(3, 3, 3), bias=False)
self.bn1 = nn.BatchNorm3d(64) # conv1的输出维度
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1) # H/2,W/2。C不变
self.layer1 = self._make_layer(block, 64, layers[0]) # H,W不变。downsample控制的shortcut,out_channel=64x4=256
self.layer2 = self._make_layer(block, 128, layers[1], stride=2) # H/2, W/2。downsample控制的shortcut,out_channel=128x4=512
self.layer3 = self._make_layer(block, 256, layers[2], stride=1, dilation=2) # H/2, W/2。downsample控制的shortcut,out_channel=256x4=1024
self.layer4 = self._make_layer(block, 512, layers[3], stride=1, dilation=4) # H/2, W/2。downsample控制的shortcut,out_channel=512x4=2048
# 最后的分类层--------------------------------------------------------------------------------------
self.conv_cls = nn.Sequential(nn.AdaptiveAvgPool3d((1, 1, 1)), nn.Flatten(),nn.Dropout(0.1),
nn.Linear(in_features=512 * block.expansion, out_features=2, bias=True))
for m in self.modules():
if isinstance(m, nn.Conv3d):
m.weight = nn.init.kaiming_normal_(m.weight, mode='fan_out')
elif isinstance(m, nn.BatchNorm3d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def _make_layer(self, block, planes, blocks, stride=1, dilation=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv3d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm3d(planes * block.expansion))
layers = []
layers.append(block(self.inplanes, planes, stride=stride, dilation=dilation, downsample=downsample))
self.inplanes = planes * block.expansion # 在下一次调用_make_layer函数的时候,self.in_channel已经x4
for i in range(1, blocks):
layers.append(block(self.inplanes, planes, dilation=dilation))
return nn.Sequential(*layers) # '*'的作用是将list转换为非关键字参数传入
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.conv_cls(x)
return x
def generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train):
model = ResNet(Bottleneck, [3, 4, 6, 3], input_W=input_W, input_H=input_H, input_D=input_D)
if not no_cuda:
model = model.cuda()
net_dict = model.state_dict()
if train:
print('loading pretrained model {}'.format(pretrain_path))
if not no_cuda:
model_weights = torch.load(pretrain_path, weights_only=True)
model_weights = torch.load(pretrain_path, map_location=torch.device('cpu'), weights_only=True)
else:
print('loading best model {}'.format(best_model_path))
if not no_cuda:
model_weights = torch.load(best_model_path, weights_only=True)
model_weights = torch.load(best_model_path, map_location=torch.device('cpu'), weights_only=True)
model_dict = {k.replace("module.", ""): v for k, v in model_weights['state_dict'].items() if k.replace("module.", "") in net_dict.keys()}
net_dict.update(model_dict) # 字典 dict2 的键/值对更新到 dict 里。
model.load_state_dict(net_dict) # model.load_state_dict()函数把加载的权重复制到模型的权重中去
new_parameters = [] #---这里设置base_parameters(低学习率用于finetune)和new_parameters(高学习率用于训练最后的分类层)使用不同的学习率
for pname, p in model.named_parameters():
for layer_name in new_layer_names:
if pname.find(layer_name) >= 0:
new_parameters.append(p)
break
new_parameters_id = list(map(id, new_parameters))
base_parameters = list(filter(lambda p: id(p) not in new_parameters_id, model.parameters()))
parameters = {'base_parameters': base_parameters, 'new_parameters': new_parameters}
return model, parameters
model, parameters = generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train = True)
params = [{'params': parameters['base_parameters'], 'lr': learning_rate },{'params': parameters['new_parameters'], 'lr': learning_rate*100}]
optimizer = torch.optim.SGD(params, momentum=0.9, weight_decay=1e-3) #--使用SGD优化学习率,也可以使用下面的Adam
# optimizer = torch.optim.Adam(params, weight_decay=1e-3)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)
# 定义 3D 数据增强的变换操作
transform = tio.Compose([
# 随机旋转图像,旋转角度范围为 -15 到 15 度,保持纵横比一致 以50%的概率进行旋转
tio.RandomAffine(degrees=15,isotropic=True, p=0.5),
# 随机裁剪(在指定区域内裁剪图像)# 裁剪到128x128x128的大小# 以50%的概率裁剪
# tio.RandomCrop(subject_size=(128, 128, 128), p=0.5),
# 随机改变亮度,调整亮度的范围# 强度范围# 以20%的概率进行亮度调整
tio.RandomBiasField(coefficients=0.5, p=0.2 ),
# 随机进行仿射变换,变换范围为一个小区间# X, Y, Z方向的平移范围 # 缩放比例
tio.RandomAffine( translation=(5, 5, 5), scales=(0.8, 1.2), p=0.5),
# 随机噪声,添加噪声# 噪声标准差
tio.RandomNoise(mean=0, std=0.05, p=0.2),
# 随机镜像翻转# 如果有多个键值,则对对应的键进行翻转# 在左右(LR), 前后(AP), 上下(SI)三个方向上随机翻转
tio.RandomFlip( keys=["image"], axes=("LR", "AP", "SI"),p=0.5),
# 随机缩放
tio.RandomElasticDeformation(num_control_points=5, max_displacement=3, locked_borders=2, p=0.2)
])
class Dataset(Dataset):
def __init__(self, img_list, input_D, input_H, input_W, train):
with open(img_list, 'r') as f:
self.img_list = [line.strip() for line in f]
print("Processing {} datas".format(len(self.img_list)))
self.input_D = input_D
self.input_H = input_H
self.input_W = input_W
def __nii2tensorarray__(self, data):
[z, y, x] = data.shape
new_data = np.reshape(data, [1, z, y, x])
new_data = new_data.astype("float32")
return new_data
def __len__(self):
return len(self.img_list)
def __getitem__(self, idx):
ith_info = self.img_list[idx].split(" ")
img_name = ith_info[0]
mask_name = ith_info[1]
label_name = ith_info[2]
# print(img_name, mask_name, label_name)
img = nibabel.load(img_name)
assert img is not None
mask = nibabel.load(mask_name)
assert mask is not None
assert img.get_fdata().shape == mask.get_fdata().shape, "img shape:{} is not equal to mask shape:{}".format(
img.get_fdata().shape, mask.get_fdata().shape) # 判断 img和mask是否大小一致
label = label_name
assert label is not None
img_array = self.__data_process__(img, mask)
img_array = self.__nii2tensorarray__(img_array) # img to tensor array
if train: # 训练集需要进行数据增强
img_array = transform(img_array)
label_array = torch.tensor([float(label)], dtype=torch.float32) # label to tensor array
return img_array, label_array
def __drop_invalid_range__(self, volume, mask):
"""
Cut off the invalid area,then get volume from mask
"""
masked_volume = np.zeros_like(volume)
masked_volume[mask > 0] = volume[mask > 0] # 将掩膜内部区域的图像数据复制到新数组中
non_zeros_idx = np.where(volume == masked_volume)
[max_z, max_h, max_w] = np.max(np.array(non_zeros_idx), axis=1)
[min_z, min_h, min_w] = np.min(np.array(non_zeros_idx), axis=1)
volume_dropped = masked_volume[min_z:max_z, min_h:max_h, min_w:max_w]
return volume_dropped
def __itensity_normalize_one_volume__(self, volume):
"""
normalize the itensity of an nd volume based on the mean and std of nonzeor region
"""
pixels = volume[volume > 0]
mean = pixels.mean()
std = pixels.std()
out = (volume - mean) / std
out_random = np.random.normal(0, 1, size=volume.shape)
out[volume == 0] = out_random[volume == 0]
return out
def __resize_data__(self, data):
"""
Resize the data to the input size
"""
[depth, height, width] = data.shape
scale = [self.input_D * 1.0 / depth, self.input_H * 1.0 / height, self.input_W * 1.0 / width]
data = ndimage.zoom(data, scale, order=0)
return data
def __data_process__(self, data, mask): # ------------------
data = data.get_fdata()
mask = mask.get_fdata()
data = self.__drop_invalid_range__(data, mask)
data = self.__resize_data__(data)
data = self.__itensity_normalize_one_volume__(data)
return data
def train(train_data_loader, model, optimizer, scheduler, total_epochs, save_interval, save_folder):
# settings
batches_per_epoch = len(train_data_loader)
# log.info()
print('{} epochs in total, {} batches per epoch'.format(total_epochs, batches_per_epoch))
loss_seg = nn.BCEWithLogitsLoss() # ----分类任务使用
if not no_cuda:
loss_seg = loss_seg.cuda()
train_time_sp = time.time()
for epoch in range(total_epochs):
# log.info('Start epoch {}'.format(epoch))
print('Start epoch {}'.format(epoch))
model.train()
num_correct = 0
val_num_correct = 0
for batch_id, batch_data in enumerate(train_data_loader):
# getting data batch
batch_id_sp = epoch * batches_per_epoch
volumes, labels = batch_data
if not no_cuda:
volumes = volumes.cuda()
labels = labels.cuda()
# Backward and optimize
optimizer.zero_grad()
out_labels = model(volumes)
loss_value_seg = loss_seg(out_labels, labels)
loss = loss_value_seg
loss.backward()
optimizer.step()
pred_labels = out_labels.argmax(dim=1)
num_correct += torch.eq(pred_labels, labels).sum().float().item()
avg_batch_time = (time.time() - train_time_sp) / (1 + batch_id_sp)
print('Train Epoch: {}-{} ({}), loss = {:.3f}, loss_seg = {:.3f}, avg_batch_time = {:.3f}' \
.format(epoch, batch_id, batch_id_sp, loss.item(), loss_value_seg.item(), avg_batch_time))
print('acc: {}' \
.format(num_correct / len(train_data_loader.dataset)))
# save model
if batch_id == 0 and batch_id_sp != 0 and batch_id_sp % save_interval == 0:
model_save_path = '{}_epoch_{}_batch_{}.pth.tar'.format(save_folder, epoch, batch_id)
model_save_dir = os.path.dirname(model_save_path)
if not os.path.exists(model_save_dir):
os.makedirs(model_save_dir)
print('Save checkpoints: epoch = {}, batch_id = {}'.format(epoch, batch_id))
torch.save({'ecpoch': epoch,'batch_id': batch_id,'state_dict': model.state_dict(),
'optimizer': optimizer.state_dict()},model_save_path)
model.eval()
for batch_id, batch_data in enumerate(val_data_loader):
volumes, labels = batch_data
if not no_cuda:
volumes = volumes.cuda()
labels = labels.cuda()
with torch.no_grad():
out_labels = model(volumes)
pred_labels = out_labels.argmax(dim=1)
val_num_correct += torch.eq(pred_labels, labels).sum().float().item()
print('acc: {}' \
.format(val_num_correct / len(val_data_loader.dataset)))
scheduler.step()
print('lr = {}'.format(scheduler.get_last_lr()))
print('Finished training')
def test(test_data_loader, model):
out_labels = []
labels = []
model.eval() # for testing
for batch_id, batch_data in enumerate(test_data_loader):
volumes, label = batch_data
if not no_cuda:
volumes = volumes.cuda()
label = label.cuda()
with torch.no_grad():
out_label = model(volumes)
out_labels.append(out_label.cpu().item())
labels.append(label.cpu().item())
return labels, out_labels
training_dataset = Dataset(train_img_list, input_D, input_H, input_W, train=True)
val_dataset = Dataset(val_img_list, input_D, input_H, input_W, train=False)
test_dataset = Dataset(test_img_list, input_D, input_H, input_W, train=False)
train_data_loader = DataLoader(training_dataset, batch_size, shuffle=True, pin_memory=True)
val_data_loader = DataLoader(val_dataset, batch_size, shuffle=True, pin_memory=True)
test_data_loader = DataLoader(test_dataset, batch_size, shuffle=True, pin_memory=True)
#------------start train-------------------------
train(train_data_loader, model, optimizer, scheduler, total_epochs, save_interval=save_intervals, save_folder=save_folder)
#------------start test-------------------------
best_model, parameters = generate_model(input_D, input_H, input_W, pretrain_path, best_model_path, train = False)
labels, out_labels = test(test_data_loader, best_model)
fpr, tpr, thresholds = roc_curve(labels, out_labels)
roc_auc = auc(fpr, tpr)
print(roc_auc)
会出现提示:
UserWarning: Using TorchIO images without a torchio.SubjectsLoader in PyTorch >= 2.3 might have unexpected consequences, e.g., the collated batches will be instances of torchio.Subject with 5D images. Replace your PyTorch DataLoader with a torchio.SubjectsLoader so that the collated batch becomes a dictionary, as expected. See https://github.com/fepegar/torchio/issues/1179 for more context about this issue.
warnings.warn(message, stacklevel=1)
是由于使用torchio对单独数据进行增强所导致,不影响使用
相关代码位于preProcess.py,第55-56行
if self.phase == 'train': # 训练集需要进行数据增强
img_array = transform(img_array)
后期有需要可以应用torchio对数据集进行增强
附加一个小工具,把imgs和rois分别从文件夹里读取出来,写到txt文件中
这里假设imgs和rois中的文件名是一一对应的
import os
def write_file_pairs_to_text(images_folder, roi_folder, output_file):
images_files = {f for f in os.listdir(images_folder) if os.path.isfile(os.path.join(images_folder, f))}
rois_files = {f for f in os.listdir(roi_folder) if os.path.isfile(os.path.join(rois_folder, f))}
print(images_files, rois_files)
unmatched_images = images_files - rois_files
unmatched_rois = rois_files - images_files
if unmatched_images or unmatched_rois:
raise ValueError(
f"Unmatched files found:\nImages not in ROI: {unmatched_images}\nROIs not in Images (or with different names/extensions): {unmatched_rois}")
# 由于接按文件名(包括扩展名)匹配,所以可以直接使用images_files(或roi_files,因为它们是相同的)
with open(output_file, 'w') as f:
for img_file in images_files:
img_path = os.path.join(images_folder, img_file)
# roi_files中也存在相同的文件名(包括扩展名),所以直接写入img_file两次
f.write(f"{img_path}\t{img_path}\n")
# 使用示例
images_folder = r"E:\...\data\images" #
rois_folder = r"E:\...\data\rois"
output_file = 'file_pairs.txt'
write_file_pairs_to_text(images_folder, rois_folder, output_file)
全部评论 (0)
还没有任何评论哟~
