基于CNN的肺炎影像分类系统(图像分类实战)
一. 前言
本人预计本项目将作为毕业设计使用,并仅限于个人用途。此项目的不足之处在于行业内资深人士眼中显得略显陈旧,在涵盖的内容主要是图像分类的基本原理与基本方法的前提下,则可循序渐进地深入学习相关技术知识。然而同时这也为我提供了一个绝佳的机会去深入了解图像分类的整体流程与核心环节。
二. 概述
去除肺炎影像表层的覆盖物……其实这个项目本质上等同于食物分类与花卉系列作品……希望各位读者能够深入学习相关知识,请不要简单地复制粘贴。
整个项目主要包含数据处理与模型处理两个部分。涉及的数据处理涵盖前期利用网络爬虫技术获取目标图像,并进行训练集、验证集以及测试集的划分;采用半监督学习策略构建数据处理模型。
该模型采用了统一的图片分类处理流程。输入图片的本质是多维矩阵(RGB三通道),通过四次卷积和池化步骤对输入图像进行特征提取:每次卷积操作均采用padding技术以维持图像尺寸不变;每次池化操作则使图像尺寸逐步减小直至缩小到一定程度后被展平为一个n维列向量(此处n为25088),随后通过全连接层nn.Linear将输出映射至分类种类数k(本项目具体分为四类),最终输出结果对应为k维列向量形式;在训练阶段,则对输出结果应用softmax函数进行归一化处理,并与真实类别的一热编码向量计算交叉熵损失函数值;测试预测阶段则取输出结果的最大值索引作为最终分类类别标签
三. 图片的获取
通过爬虫技术获取图片,在肺炎影像分类系统中进行探索时发现部分结果并非所需内容
其中部分图片并无实际价值
这些无关紧要的图像需要人工筛选出来
根据不同的目标类别 进行多次爬虫采集能够完成数据集划分工作
将同类型图像分别存放在独立文件夹内 作为有监督学习阶段的数据来源
非标注数据则单独存放于另一个文件夹 中 供半监督学习使用
以下是用于执行上述功能的具体代码片段
import re
import os
import requests
def get_html(url, headers, params):
response = requests.get(url, headers=headers, params=params)
response.encoding = "utf-8"
if response.status_code == 200:
return response.text
else:
print("网站源码获取错误")
def parse_pic_url(html):
result = re.findall('thumbURL":"(.*?)"', html, re.S)
return result
def get_pic_content(url):
response = requests.get(url)
return response.content
def save_pic(fold_name, content, pic_name):
with open(fold_name + "/" + str(pic_name) + ".jpg", "wb") as f:
f.write(content)
f.close()
def create_fold(fold_name):
try:
os.mkdir(fold_name)
except:
print("文件已存在")
def main():
# 输入文件夹名字
fold_name = input("请输入您要抓取图片的名字: ")
# 调用函数,创建文件夹
create_fold(fold_name)
# 输入要抓取的图片页数
page_name = input("请输入要抓取的页数: ")
pic_name = 0
for i in range(10):
url = "https://image.baidu.com/search/acjson?tn=resultjson_com&logid=11836243366050550448&ipn=rj&ct=201326592&is=&fp=result&fr=ala&word=%E5%A4%A7%E7%86%8A%E7%8C%AB&queryWord=%E5%A4%A7%E7%86%8A%E7%8C%AB&cl=2&lm=-1&ie=utf-8&oe=utf-8&adpicid=&st=&z=&ic=&hd=&latest=©right=&s=&se=&tab=&width=&height=&face=&istype=&qc=&nc=&expermode=&nojc=&isAsync=&pn=60&rn=30&gsm=3c&1695869692997="
headers = \
{"Accept": "text/plain, */*; q=0.01",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
"Cookie": "winWH=%5E6_1659x838; BDIMGISLOGIN=0; BDqhfp=%E5%A4%A7%E7%86%8A%E7%8C%AB%26%26-10-1undefined%26%268568%26%267; BIDUPSID=84AA588D485BC5D9748C16152F786E4A; PSTM=1664863489; BDUSS=9UelhFRmVxQ2FYRURpM2hnanRSb09DcE5BcDFIYmdhM25DSXd3bWFMLX5mbWhqRVFBQUFBJCQAAAAAAAAAAAEAAABc%7EUGiAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAL%7ExQGO%7E8UBjc2; BDUSS_BFESS=9UelhFRmVxQ2FYRURpM2hnanRSb09DcE5BcDFIYmdhM25DSXd3bWFMLX5mbWhqRVFBQUFBJCQAAAAAAAAAAAEAAABc%7EUGiAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAL%7ExQGO%7E8UBjc2; BAIDUID=AA120298DBC668808E941F202EDAFE7D:FG=1; BAIDUID_BFESS=AA120298DBC668808E941F202EDAFE7D:FG=1; ZFY=ZkM1wYgsnkzHUCE:B8RSn0l9c2wZElo2ztkkXles7ZEQ:C; BDRCVFR[dG2JNJb_ajR]=mk3SLVN4HKm; BDRCVFR[-pGxjrCMryR]=mk3SLVN4HKm; cleanHistoryStatus=0; BDRCVFR[Tp5-T0kH1pb]=mk3SLVN4HKm; BDRCVFR[tox4WRQ4-Km]=mk3SLVN4HKm; indexPageSugList=%5B%22%E5%A4%A7%E7%86%8A%E7%8C%AB%22%2C%22%E7%9C%BC%E9%95%9C%E6%A1%86%E7%A2%8E%22%5D; userFrom=null; ab_sr=1.0.1_ZjU4YWMxNDUwYzdmOTA5MzNlOTcwMzU1Y2Q2Yzg5N2EyNDAxYTJmY2E1NGU4MTFjZDYzMDllMmQ1ZTcyYzE2NmJhNTNmY2I3YzAyOWNkZDEzYzhiMmRlMWUxMWEzMTdiNGNkZTEzNTk3N2JiOGY2NjUxZTYyZGYwMTYwNTkzZWI3YWU1MmVmMThhNWU5ZWMwYThkYmIyY2UxNWFhM2RiZg==",
"Host": "image.baidu.com",
"Referer": "https://image.baidu.com/search/index?tn=baiduimage&ct=201326592&lm=-1&cl=2&ie=gb18030&word=%B4%F3%D0%DC%C3%A8&fr=ala&ala=1&alatpl=normal&pos=0&dyTabStr=MTEsMCwxLDYsMyw1LDQsMiw4LDcsOQ%3D%3D",
"Sec-Ch-Ua": '"Microsoft Edge";v="117", "Not;A=Brand";v="8", "Chromium";v="117"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43",
"X-Requested-With": "XMLHttpRequest", }
params = {"tn": "resultjson_com",
"logid": "11836243366050550448",
"ipn": "rj",
"ct": "201326592",
"fp": "result",
"fr": "ala",
"word": fold_name,
"queryWord": fold_name,
"cl": "2",
"lm": "-1",
"ie": "utf-8",
"oe": "utf-8",
"pn": str(int(i + 1) * 30),
"rn": "30",
"gsm": "3c"
}
html = get_html(url, headers, params)
result = parse_pic_url(html)
for item in result:
pic_content = get_pic_content(item)
save_pic(fold_name, pic_content, pic_name)
pic_name += 1
print("正在保存第" + str(pic_name) + "张图片")
if __name__ == "__main__":
main()
四. 数据集的构建
我们围绕半监督学习的核心目标展开研究,在具体实施过程中主要采用部分带标签的数据进行模型训练。随着模型达到60%以上的准确率后逐步引入无标注样本,在获得一个待分类的目标类别后判断其置信度水平。如果该类别在预测过程中的置信度超过99%,则认定该图像的真实分类结果为该目标类别,并将原图像与其对应的目标类别同步纳入到模型训练数据中
class pneumonia_Dataset(Dataset):
def __init__(self, path, mode="train"):
self.mode = mode
if mode == "semi":
self.X = self.read_file(path)
else:
self.X, self.Y = self.read_file(path)
self.Y = torch.LongTensor(self.Y) # 标签转为长整形\
if mode == "train":
self.transform = train_transform
else:
self.transform = val_transform
def read_file(self, path):
if self.mode == "semi":
file_list = os.listdir(path)
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
# 列出文件夹下所有文件名字
for j, img_name in enumerate(file_list):
img_path = os.path.join(path, img_name)
img = Image.open(img_path)
img = img.resize((HW, HW)).convert('RGB')
xi[j, ...] = img
print("读到了%d个数据" % len(xi))
return xi
else:
for i in tqdm(range(4)):
file_dir = path + "/%02d" % i
file_list = os.listdir(file_dir)
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
yi = np.zeros(len(file_list), dtype=np.uint8)
# 列出文件夹下所有文件名字
for j, img_name in enumerate(file_list):
img_path = os.path.join(file_dir, img_name)
img = Image.open(img_path)
img = img.resize((HW, HW))
xi[j, ...] = img
yi[j] = i
if i == 0:
X = xi
Y = yi
else:
X = np.concatenate((X, xi), axis=0)
Y = np.concatenate((Y, yi), axis=0)
print("读到了%d个数据" % len(Y))
return X, Y
def __getitem__(self, item):
if self.mode == "semi":
return self.transform(self.X[item]), self.X[item]
else:
return self.transform(self.X[item]), self.Y[item]
def __len__(self):
return len(self.X)
class semiDataset(Dataset):
def __init__(self, no_label_loder, model, device, thres=0.99):
x, y = self.get_label(no_label_loder, model, device, thres)
if x == []:
self.flag = False
else:
self.flag = True
self.X = np.array(x)
self.Y = torch.LongTensor(y)
self.transform = train_transform
def get_label(self, no_label_loder, model, device, thres):
model = model.to(device)
pred_prob = []
labels = []
x = []
y = []
soft = nn.Softmax()
with torch.no_grad():
for bat_x, _ in no_label_loder:
bat_x = bat_x.to(device)
pred = model(bat_x)
pred_soft = soft(pred)
pred_max, pred_value = pred_soft.max(1)
pred_prob.extend(pred_max.cpu().numpy().tolist())
labels.extend(pred_value.cpu().numpy().tolist())
for index, prob in enumerate(pred_prob):
if prob > thres:
x.append(no_label_loder.dataset[index][1]) # 调用到原始的getitem
y.append(labels[index])
return x, y
def __getitem__(self, item):
return self.transform(self.X[item]), self.Y[item]
def __len__(self):
return len(self.X)
def get_semi_loader(no_label_loder, model, device, thres):
semiset = semiDataset(no_label_loder, model, device, thres)
if semiset.flag == False:
return None
else:
semi_loader = DataLoader(semiset, batch_size=16, shuffle=False)
return semi_loader
五. 模型及训练流程
代码如下:
class myModel(nn.Module):
def __init__(self, num_class):
super(myModel, self).__init__()
# 3 *224 *224 -> 512*7*7 -> 拉直 -> 全连接分类
self.layer0 = nn.Sequential(
nn.Conv2d(3, 64, 3, 1, 1), # 64*224*224
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2) # 64*112*112
)
self.layer1 = nn.Sequential(
nn.Conv2d(64, 128, 3, 1, 1), # 128*112*112
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2) # 128*56*56
)
self.layer2 = nn.Sequential(
nn.Conv2d(128, 256, 3, 1, 1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(2) # 256*28*28
)
self.layer3 = nn.Sequential(
nn.Conv2d(256, 512, 3, 1, 1),
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2) # 512*14*14
)
self.pool2 = nn.MaxPool2d(2) # 512*7*7
self.fc1 = nn.Linear(25088, 1000) # 25088->1000
self.relu2 = nn.ReLU()
self.fc2 = nn.Linear(1000, num_class) # 1000-11
def forward(self, x):
x = self.layer0(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.pool2(x)
x = x.view(x.size()[0], -1)
x = self.fc1(x)
x = self.relu2(x)
x = self.fc2(x)
return x
def train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path):
model = model.to(device)
semi_loader = None
plt_train_loss = []
plt_val_loss = []
plt_train_acc = []
plt_val_acc = []
max_acc = 0.0
for epoch in range(epochs):
train_loss = 0.0
val_loss = 0.0
train_acc = 0.0
val_acc = 0.0
semi_loss = 0.0
semi_acc = 0.0
start_time = time.time()
model.train()
for batch_x, batch_y in train_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
train_bat_loss = loss(pred, target)
train_bat_loss.backward()
optimizer.step() # 更新参数 之后要梯度清零否则会累积梯度
optimizer.zero_grad()
train_loss += train_bat_loss.cpu().item()
train_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
plt_train_loss.append(train_loss / train_loader.__len__())
plt_train_acc.append(train_acc / train_loader.dataset.__len__()) # 记录准确率,
if semi_loader != None:
for batch_x, batch_y in semi_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
semi_bat_loss = loss(pred, target)
semi_bat_loss.backward()
optimizer.step() # 更新参数 之后要梯度清零否则会累积梯度
optimizer.zero_grad()
semi_loss += semi_bat_loss.cpu().item()
semi_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
print("半监督数据集的训练准确率为", semi_acc / train_loader.dataset.__len__())
model.eval()
with torch.no_grad():
for batch_x, batch_y in val_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
val_bat_loss = loss(pred, target)
val_loss += val_bat_loss.cpu().item()
val_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
plt_val_loss.append(val_loss / val_loader.dataset.__len__())
plt_val_acc.append(val_acc / val_loader.dataset.__len__())
if epoch % 3 == 0 and plt_val_acc[-1] > 0.6:
semi_loader = get_semi_loader(no_label_loader, model, device, thres)
if val_acc > max_acc:
torch.save(model, save_path)
max_acc = val_loss
print('[%03d/%03d] %2.2f sec(s) TrainLoss : %.6f | valLoss: %.6f Trainacc : %.6f | valacc: %.6f' % \
(epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1], plt_train_acc[-1],
plt_val_acc[-1])) # 打印训练结果。
# 画train-loss图
plt.plot(plt_train_loss)
plt.plot(plt_val_loss)
plt.title("loss")
plt.legend(["train", "val"])
plt.show()
# 画val-loss图
plt.plot(plt_train_acc)
plt.plot(plt_val_acc)
plt.title("acc")
plt.legend(["train", "val"])
plt.show()
六. 一些参数的确定
train_path = r"C:\Users\PC\PycharmProjects\pneumoniaClassification\archive\Training"
val_path = r"C:\Users\PC\PycharmProjects\pneumoniaClassification\archive\Validation"
test_path = r"C:\Users\PC\PycharmProjects\pneumoniaClassification\archive\Testing"
no_label_path = r"C:\Users\PC\PycharmProjects\pneumoniaClassification\archive\Training\semi"
train_set = pneumonia_Dataset(train_path, "train")
val_set = pneumonia_Dataset(val_path, "val")
no_label_set = pneumonia_Dataset(no_label_path, "semi")
train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, shuffle=True)
no_label_loader = DataLoader(no_label_set, batch_size=16, shuffle=False)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = myModel(11)
lr = 0.001
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
save_path = "model_save/best_model.pth"
epochs = 3 # ***************************************
thres = 0.99
train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path)
七. 运行结果
由于深度学习项目运行效率贼低,这里就以epoch为3做简单示例。



