Advertisement

半监督学习用于图片分类

阅读量:

1.半监督的含义

半监督的方法涉及利用训练集中标注与未标注数据的结合。具体而言,在实际应用中,在 unlabeled 的样本上进行预测以补充标注数据是其核心思想之一:通过已训练好的模型对测试样本进行推断,并根据预测结果的质量筛选部分样本赋予真实标签。随后将这些新增加的真实样本纳入到后续的模型优化过程中即可获得一批高质量的标注样本,并将其补充到原始训练集中

2.半监督运用于图片分类的实例代码分析

2.1导入python包

复制代码
 import random

    
 import torch
    
 import torch.nn as nn
    
 import numpy as np
    
 import os
    
 from PIL import Image #读取图片数据
    
 from torch.utils.data import Dataset, DataLoader
    
 from tqdm import tqdm
    
 from torchvision import transforms
    
 import time
    
 import matplotlib.pyplot as plt
    
 from model_utils.model import initialize_model

导入所需Python库,并包含以下关键组件:随机数生成库、PyTorch框架、NumPy计算包、图片处理模块PIL、数据处理类(其中包括 Dataset 和 DataLoader)、进度条组件 tqdm 以及用于图像转换的 torchvision 库,并配合 matplotlib 进行数据可视化。

2.2设置随机种子

复制代码
 def seed_everything(seed):

    
     torch.manual_seed(seed)
    
     torch.cuda.manual_seed(seed)
    
     torch.cuda.manual_seed_all(seed)
    
     torch.backends.cudnn.benchmark = False
    
     torch.backends.cudnn.deterministic = True
    
     random.seed(seed)
    
     np.random.seed(seed)
    
     os.environ['PYTHONHASHSEED'] = str(seed)

为该函数命名 seed_everything ,用于设置全局性的随机种子并确保实验结果的一致性和可重复性。该函数同时配置了Python和NumPy各自的随机数生成器种子,并针对PyTorch框架,在处理基于GPU的操作时也保持了算法的一致性。

2.3主要参数设置

复制代码
 seed_everything(0)  # 设置随机种子为0

    
 HW = 224  # 设置图像的高度和宽度为224

通过调用 seed_everything 函数实现随机种子的初始化,并设定图像的高度与宽度均为224

2.4数据预处理的转化

复制代码
 train_transform = transforms.Compose(

    
     [
    
     transforms.ToPILImage(),
    
     transforms.RandomResizedCrop(224),
    
     transforms.RandomRotation(50),
    
     transforms.ToTensor()
    
     ]
    
 )
    
  
    
 val_transform = transforms.Compose(
    
     [
    
     transforms.ToPILImage(),
    
     transforms.ToTensor()
    
     ]
    
 )

具体阐述训练样本与验证样本的数据预处理流程。具体而言,针对训练样本的数据预处理包括实施随机裁剪、执行随机旋转以及将其转换为Tensor类型;而对于验证样本,则仅需将其图像数据转换为Tensor形式。

2.5定义数据集

复制代码
    class food_Dataset(Dataset):

创建一个命名为 food_Dataset 的自定义数据集类,并使其继承于 torch.utils.data.Dataset

2.6数据集初始化

复制代码
    def __init__(self, path, mode="train"):

该类的数据集初始化方法依赖于给定的路径信息以及指定类型的数据模式(如训练型、验证型和半监督型)。

复制代码
 if mode == "semi":

    
     self.X = self.read_file(path)
    
 else:
    
     self.X, self.Y = self.read_file(path)
    
     self.Y = torch.LongTensor(self.Y)  #标签转为长整形

按照指定模式读取数据:如果是半监督学习场景,则仅提取特征X;否则同时获取特征X以及对应的标签Y,并对标签进行长整型Tensor编码。

2.7读取文件

复制代码
    def read_file(self, path):

定义一个方法,用于从指定路径读取图像文件。

半监督模式:

复制代码
 if self.mode == "semi":

    
     file_list = os.listdir(path)
    
     xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
    
     for j, img_name in enumerate(file_list):
    
     img_path = os.path.join(path, img_name)
    
     img = Image.open(img_path)
    
     img = img.resize((HW, HW))
    
     xi[j, ...] = img
    
     print("读到了%d个数据" % len(xi))
    
     return xi

训练模式:

复制代码
 else:

    
     for i in tqdm(range(11)):
    
     file_dir = path + "/%02d" % i
    
     file_list = os.listdir(file_dir)
    
  
    
     xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
    
     yi = np.zeros(len(file_list), dtype=np.uint8)
    
  
    
     for j, img_name in enumerate(file_list):
    
         img_path = os.path.join(file_dir, img_name)
    
         img = Image.open(img_path)
    
         img = img.resize((HW, HW))
    
         xi[j, ...] = img
    
         yi[j] = i
    
  
    
     if i == 0:
    
         X = xi
    
         Y = yi
    
     else:
    
         X = np.concatenate((X, xi), axis=0)
    
         Y = np.concatenate((Y, yi), axis=0)
    
     print("读到了%d个数据" % len(Y))
    
     return X, Y

2.8访问数据

复制代码
    def __getitem__(self, item):

说明如何获取单个数据样本及其对应的标签的方法。在半监督模式下,该方法会输出图像及其原始数据;在训练模式下,则会输出该图像及其相应的标签。

复制代码
 def __len__(self):

    
     return len(self.X)

定义数据集的长度。

2.9半监督数据集

复制代码
 class semiDataset(Dataset):

    
     def __init__(self, no_label_loder, model, device, thres=0.99):
    
     x, y = self.get_label(no_label_loder, model, device, thres)
    
     if x == []:
    
         self.flag = False
    
  
    
     else:
    
         self.flag = True
    
         self.X = np.array(x)
    
         self.Y = torch.LongTensor(y)
    
         self.transform = train_transform
    
     def get_label(self, no_label_loder, model, device, thres):
    
     model = model.to(device)
    
     pred_prob = []
    
     labels = []
    
     x = []
    
     y = []
    
     soft = nn.Softmax()
    
     with torch.no_grad():
    
         for bat_x, _ in no_label_loder:
    
             bat_x = bat_x.to(device)
    
             pred = model(bat_x)
    
             pred_soft = soft(pred)
    
             pred_max, pred_value = pred_soft.max(1)
    
             pred_prob.extend(pred_max.cpu().numpy().tolist())
    
             labels.extend(pred_value.cpu().numpy().tolist())
    
  
    
     for index, prob in enumerate(pred_prob):
    
         if prob > thres:
    
             x.append(no_label_loder.dataset[index][1])   #调用到原始的getitem
    
             y.append(labels[index])
    
     return x, y
    
  
    
     def __getitem__(self, item):
    
     return self.transform(self.X[item]), self.Y[item]
    
     def __len__(self):
    
     return len(self.X)
    
  
    
 def get_semi_loader(no_label_loder, model, device, thres):
    
     semiset = semiDataset(no_label_loder, model, device, thres)
    
     if semiset.flag == False:
    
     return None
    
     else:
    
     semi_loader = DataLoader(semiset, batch_size=16, shuffle=False)
    
     return semi_loader

2.10自定义模型

复制代码
 class myModel(nn.Module):

    
     def __init__(self, num_class):
    
     super(myModel, self).__init__()
    
     #3 *224 *224  -> 512*7*7 -> 拉直 -》全连接分类
    
     self.conv1 = nn.Conv2d(3, 64, 3, 1, 1)    # 64*224*224
    
     self.bn1 = nn.BatchNorm2d(64)
    
     self.relu = nn.ReLU()
    
     self.pool1 = nn.MaxPool2d(2)   #64*112*112
    
  
    
  
    
     self.layer1 = nn.Sequential(
    
         nn.Conv2d(64, 128, 3, 1, 1),    # 128*112*112
    
         nn.BatchNorm2d(128),
    
         nn.ReLU(),
    
         nn.MaxPool2d(2)   #128*56*56
    
     )
    
     self.layer2 = nn.Sequential(
    
         nn.Conv2d(128, 256, 3, 1, 1),
    
         nn.BatchNorm2d(256),
    
         nn.ReLU(),
    
         nn.MaxPool2d(2)   #256*28*28
    
     )
    
     self.layer3 = nn.Sequential(
    
         nn.Conv2d(256, 512, 3, 1, 1),
    
         nn.BatchNorm2d(512),
    
         nn.ReLU(),
    
         nn.MaxPool2d(2)   #512*14*14
    
     )
    
  
    
     self.pool2 = nn.MaxPool2d(2)    #512*7*7
    
     self.fc1 = nn.Linear(25088, 1000)   #25088->1000
    
     self.relu2 = nn.ReLU()
    
     self.fc2 = nn.Linear(1000, num_class)  #1000-11
    
  
    
     def forward(self, x):
    
     x = self.conv1(x)
    
     x = self.bn1(x)
    
     x = self.relu(x)
    
     x = self.pool1(x)
    
     x = self.layer1(x)
    
     x = self.layer2(x)
    
     x = self.layer3(x)
    
     x = self.pool2(x)
    
     x = x.view(x.size()[0], -1)
    
     x = self.fc1(x)
    
     x = self.relu2(x)
    
     x = self.fc2(x)
    
     return x

2.11定义训练与验证过程

复制代码
 def train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path):

    
     model = model.to(device)
    
     semi_loader = None
    
     plt_train_loss = []
    
     plt_val_loss = []
    
  
    
     plt_train_acc = []
    
     plt_val_acc = []
    
  
    
     max_acc = 0.0
    
  
    
     for epoch in range(epochs):
    
     train_loss = 0.0
    
     val_loss = 0.0
    
     train_acc = 0.0
    
     val_acc = 0.0
    
     semi_loss = 0.0
    
     semi_acc = 0.0
    
  
    
  
    
     start_time = time.time()
    
  
    
     model.train()
    
     for batch_x, batch_y in train_loader:
    
         x, target = batch_x.to(device), batch_y.to(device)
    
         pred = model(x)
    
         train_bat_loss = loss(pred, target)
    
         train_bat_loss.backward()
    
         optimizer.step()  # 更新参数 之后要梯度清零否则会累积梯度
    
         optimizer.zero_grad()
    
         train_loss += train_bat_loss.cpu().item()
    
         train_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
    
     plt_train_loss.append(train_loss / train_loader.__len__())
    
     plt_train_acc.append(train_acc/train_loader.dataset.__len__()) #记录准确率,
    
  
    
     if semi_loader!= None:
    
         for batch_x, batch_y in semi_loader:
    
             x, target = batch_x.to(device), batch_y.to(device)
    
             pred = model(x)
    
             semi_bat_loss = loss(pred, target)
    
             semi_bat_loss.backward()
    
             optimizer.step()  # 更新参数 之后要梯度清零否则会累积梯度
    
             optimizer.zero_grad()
    
             semi_loss += train_bat_loss.cpu().item()
    
             semi_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
    
         print("半监督数据集的训练准确率为", semi_acc/train_loader.dataset.__len__())
    
  
    
  
    
     model.eval()
    
     with torch.no_grad():
    
         for batch_x, batch_y in val_loader:
    
             x, target = batch_x.to(device), batch_y.to(device)
    
             pred = model(x)
    
             val_bat_loss = loss(pred, target)
    
             val_loss += val_bat_loss.cpu().item()
    
             val_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
    
     plt_val_loss.append(val_loss / val_loader.dataset.__len__())
    
     plt_val_acc.append(val_acc / val_loader.dataset.__len__())
    
  
    
     if epoch%3 == 0 and plt_val_acc[-1] > 0.6:
    
         semi_loader = get_semi_loader(no_label_loader, model, device, thres)
    
  
    
     if val_acc > max_acc:
    
         torch.save(model, save_path)
    
         max_acc = val_loss
    
  
    
     print('[%03d/%03d] %2.2f sec(s) TrainLoss : %.6f | valLoss: %.6f Trainacc : %.6f | valacc: %.6f' % \
    
           (epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1], plt_train_acc[-1], plt_val_acc[-1])
    
           )  # 打印训练结果。 注意python语法, %2.2f 表示小数位为2的浮点数, 后面可以对应。
    
  
    
     plt.plot(plt_train_loss)
    
     plt.plot(plt_val_loss)
    
     plt.title("loss")
    
     plt.legend(["train", "val"])
    
     plt.show()
    
  
    
  
    
     plt.plot(plt_train_acc)
    
     plt.plot(plt_val_acc)
    
     plt.title("acc")
    
     plt.legend(["train", "val"])
    
     plt.show()

2.12设计超参

复制代码
 train_path = r"C:\Users\AIERXUAN\PycharmProjects\machinelearning\deep learning\food-11_sample\training\labeled"

    
 val_path = r"C:\Users\AIERXUAN\PycharmProjects\machinelearning\deep learning\food-11_sample\validation"
    
 no_label_path = r"C:\Users\AIERXUAN\PycharmProjects\machinelearning\deep learning\food-11_sample\training\unlabeled\00"
    
  
    
 train_set = food_Dataset(train_path, "train")
    
 val_set = food_Dataset(val_path, "val")
    
 no_label_set = food_Dataset(no_label_path, "semi")
    
  
    
 train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
    
 val_loader = DataLoader(val_set, batch_size=16, shuffle=True)
    
 no_label_loader = DataLoader(no_label_set, batch_size=16, shuffle=False)
    
  
    
 model = myModel(11)
    
 # model, _ = initialize_model("vgg", 11, use_pretrained=True)
    
  
    
  
    
 lr = 0.001
    
 loss = nn.CrossEntropyLoss()
    
 optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    
 device = "cuda" if torch.cuda.is_available() else "cpu"
    
 save_path = "model_save/best_model.pth"
    
 epochs = 15
    
 thres = 0.99

2.13进行训练

复制代码
    train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path)

全部评论 (0)

还没有任何评论哟~