Advertisement

新冠肺炎患者图像分类

阅读量:

数据描述

来自卡塔尔多哈卡塔尔大学和孟加拉国达卡大学的一组研究人员,以及来自巴基斯坦和马来西亚的合作者与医生合作,建立了一个针对COVID-19阳性病例的胸部X射线图像数据库,以及正常和病毒性肺炎图像。

数据来源

https://www.heywhale.com/mw/dataset/6027caee891f960015c863d7

数据说明

COVID-19阳性病例的胸部X射线图像以及正常和病毒性肺炎图像的数据库。 数据包含有1200个COVID-19阳性图像,1341正常图像和1345病毒性肺炎图像。

复制代码
  
    
 #导入需要的包
    
 import os
    
 import math
    
 import zipfile
    
 import random
    
 import json
    
 import cv2
    
 import numpy as np
    
 from PIL import Image
    
 import paddle
    
 import paddle.fluid as fluid
    
 from paddle.fluid.dygraph import Linear,Conv2D,Pool2D
    
 import matplotlib.pyplot as plt
    
 paddle.enable_static() #转换为静态图
    
    
    
    

定义了一个参数字典configs,其中包含了各种参数的设置,输入图片的大小、分类数、数据路径、模型保存路径、学习率、批次大小和学习次数等。这些参数的设置将在后面的程序中被调用和使用。

复制代码
 #一些参数的设置

    
 configs = {
    
     "input_size": [3, 1024,1024],                           #输入图片的shape
    
     "class_dim":3,                                     #分类数
    
     'src_path':'data/data82373/input_data.rar',     #数据的路径
    
     'train_path':'input_data',                #解压路径
    
     'model_save_dir':'save_model',       # 模型保存路径
    
     'learning_rate':0.001,          #学习率
    
     'batch_size':32,               #批次大小
    
     'epoch':10                    #学习次数
    
 }
    
    
    
    

进行图片的预处理,获取训练集和测试集的数据。定义了三个空列表COVID、NORMAL和Viral_Pneumonia,分别用于存放新冠肺炎患者的胸透图片、正常人的胸透图片和病毒性肺炎患者的胸透图片。另外还定义了三个与之对应的标签列表COVID_label、NORMAL_label和Viral_Pneumonia_label。

通过遍历指定目录下的三个子目录,将每个图片的路径名存放到对应的列表中,并给相应的标签赋值。同时,展示了一个例子的图片和对应的标签。

接下来,将不同类型的图片路径和标签进行合并,并打乱顺序。然后将图片路径列表和标签列表转换为numpy数组,并进行随机打乱顺序。

最后,根据指定的比例(ratio),将全部样本分成训练集和测试集。并返回训练集和测试集的图片路径和标签。

复制代码
 #预处理图片,获取训练和测试的数据集

    
  
    
 COVID =[] #新冠肺炎患者的胸透图片
    
 COVID_label = []
    
  
    
 NORMAL = [] #正常人的胸透图片
    
 NORMAL_label = []
    
  
    
 Viral_Pneumonia = [] #病毒性肺炎患者的胸透图片
    
 Viral_Pneumonia_label = []
    
  
    
 # 获取所以图片的路径名
    
 # 对应的列表中,同时贴上标签,存放到label列表中
    
 def get_files(file_path, ratio):
    
     for file in os.listdir(file_path + '/COVID'):
    
     COVID.append(file_path + '/COVID' + '/' + file)
    
     COVID_label.append(0)   # 0为新冠肺炎患者
    
     for file in os.listdir(file_path + '/NORMAL'):
    
     NORMAL.append(file_path + '/NORMAL' + '/' + file)
    
     NORMAL_label.append(1)  # 1为正常人
    
     for file in os.listdir(file_path + '/Viral_Pneumonia'):
    
     Viral_Pneumonia.append(file_path + '/Viral_Pneumonia' + '/' + file)
    
     Viral_Pneumonia_label.append(2)     # 2为病毒性肺炎患者
    
  
    
     #检测是否读取成功
    
     for i in range(1):
    
     # 解决中文显示问题
    
     plt.rcParams['font.sans-serif'] = ['SimHei']
    
     plt.rcParams['axes.unicode_minus'] = False
    
     img = plt.imread(NORMAL[i+5])
    
     plt.title('图片的类型是:'+str(NORMAL_label[i+5]))
    
     plt.imshow(img)
    
     plt.show()
    
  
    
     #将新路径的图片进行打乱处理
    
     image_list = np.hstack((COVID, NORMAL, Viral_Pneumonia))
    
     label_list = np.hstack((COVID_label, NORMAL_label, Viral_Pneumonia_label))
    
  
    
     # 利用shuffle打乱顺序
    
     temp = np.array([image_list, label_list])
    
     temp = temp.transpose()
    
     np.random.shuffle(temp)
    
  
    
     # 将所有的img和lab转换成list
    
     all_image_list = list(temp[:, 0])
    
     all_label_list = list(temp[:, 1])
    
  
    
     # 将所得List分为两部分,一部分用来训练tra,一部分用来测试val
    
     # ratio是测试集的比例,看情况填入0-1的一个小数
    
     n_sample = len(all_label_list)
    
  
    
     n_val = int(math.ceil(n_sample * ratio))  # 测试样本数
    
     n_train = n_sample - n_val  # 训练样本数
    
  
    
     tra_images = all_image_list[0:n_train]
    
     tra_labels = all_label_list[0:n_train]
    
     tra_labels = [int(float(i)) for i in tra_labels]
    
  
    
     val_images = all_image_list[n_train:-1]
    
     val_labels = all_label_list[n_train:-1]
    
     val_labels = [int(float(i)) for i in val_labels]
    
  
    
     return tra_images, tra_labels, val_images, val_labels
    
    
    
    

自定义的数据读取函数data_reader,用于读取图片数据和对应的标签。

在函数内部,通过一个生成器实现迭代地读取每个图片和标签。在循环中,首先获取当前图片的路径和标签,然后使用OpenCV的cv2.imread函数读取图片,并调整大小为3x256x256的尺寸。

接下来,将读取到的图片数据转换为numpy数组,并将像素值缩放到0到1之间。最后,使用yield关键字返回每个图片和标签。

最终,返回一个可以迭代获取图片和标签的生成器对象。

复制代码
 #自定义读取数据函数

    
  
    
 def data_reader(images,labels):
    
     '''
    
     自定义data_reader
    
     '''
    
     def reader():
    
         for item in range(len(images)):
    
             img_path = images[item]
    
             lab = labels[item] 
    
             img = cv2.imread(img_path)
    
             img = np.resize(img,(3,256,256))
    
             img = np.array(img).reshape(3,256,256).astype('float32') #要reshape一下,因为输入数据的格式是[256,256,3],而paddle接受数据格式是[3,256,256]
    
             img = img/255.0
    
             yield img, int(lab)
    
     return reader
    
    
    
    

定义一个卷积神经网络模型CNN_model。

模型的输入是tra_images,即训练数据的图片。模型的结构包括三个卷积-池化层和一个全连接输出层。

第一个卷积-池化层使用了fluid框架提供的fluid.nets.simple_img_conv_pool函数,传入输入图片tra_images,设置滤波器大小为5,过滤器数量为20,池化核大小为2×2,池化步长为2,激活函数为ReLU。之后使用fluid.layers.batch_norm函数进行批归一化操作。

第二个和第三个卷积-池化层的设置与第一个层类似,不同之处在于过滤器数量为50。

最后,使用fluid.layers.fc函数添加一个全连接层,输入为第三个卷积-池化层的输出,输出尺寸为3(因为是分成三类)。输出层的激活函数使用了softmax。

最终,返回模型的输出prediction。

复制代码
 # 定义训练的模型

    
  
    
 def CNN_model(tra_images):
    
     # 第一个卷积-池化层
    
     conv_pool_1 = fluid.nets.simple_img_conv_pool(
    
     input=tra_images,         # 输入图像
    
     filter_size=5,     # 滤波器的大小
    
     num_filters=20,    # filter 的数量。它与输出的通道相同
    
     pool_size=2,       # 池化核大小2*2
    
     pool_stride=2,     # 池化步长   
    
     act="relu")        # 激活类型
    
     conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
    
     # 第二个卷积-池化层
    
     conv_pool_2 = fluid.nets.simple_img_conv_pool(
    
     input=conv_pool_1,
    
     filter_size=5,
    
     num_filters=50,
    
     pool_size=2,
    
     pool_stride=2,
    
     act="relu")
    
     conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)
    
     # 第三个卷积-池化层
    
     conv_pool_3 = fluid.nets.simple_img_conv_pool(
    
     input=conv_pool_2,
    
     filter_size=5,
    
     num_filters=50,
    
     pool_size=2,
    
     pool_stride=2,
    
     act="relu")
    
     # 以softmax为激活函数的全连接输出层,因为是分成三类,所以size是三
    
     prediction = fluid.layers.fc(input=conv_pool_3, size=3, act='softmax')
    
     return prediction
    
    
    
    

使用 PaddlePaddle 框架实现了一个基于 CNN 的分类器模型,其中 tra_images 和 label 分别是输入的图片和标签数据,predict 是通过 CNN 模型对输入图片进行分类得到的结果。通过计算交叉熵损失函数和准确率,再使用 Adam 优化器对模型进行优化训练。

复制代码
 data_shape = [3,256,256]

    
 tra_images = fluid.layers.data(name='image', shape=data_shape, dtype='float32')
    
 label = fluid.layers.data(name='label', shape=[1], dtype='int64')
    
 # 获取分类器,用cnn进行分类
    
  
    
 #model = paddle.vision.models.resnet50(pretrained=True,num_classes=3)
    
 #predict = model(tra_images)
    
 predict =  CNN_model(tra_images)
    
  
    
 print(np.shape(predict))
    
 # 获取损失函数和准确率
    
 cost = fluid.layers.cross_entropy(input=predict, label=label) # 交叉熵
    
 avg_cost = fluid.layers.mean(cost)                            # 计算cost中所有元素的平均值
    
 acc = fluid.layers.accuracy(input=predict, label=label)       #使用输入和标签计算准确率
    
  
    
 # 定义优化方法
    
 optimizer =fluid.optimizer.Adam(learning_rate=configs['learning_rate'])
    
 optimizer.minimize(avg_cost)
    
 print("完成")
    
    
    
    
复制代码
 # 定义使用CPU还是GPU,使用CPU时use_cuda = False,使用GPU时use_cuda = True

    
 use_cuda = False
    
 place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
    
 # 创建执行器,初始化参数
    
 exe = fluid.Executor(place)
    
 exe.run(fluid.default_startup_program())
    
    
    
    

定义一个数据读取器 data_reader,并用 paddle.batch() 函数对训练、验证和测试数据进行划分和批处理。具体解释如下:

首先,我们从文件夹 configs['train_path'] 中获取文件路径,并使用函数 get_files 将数据随机分为训练集和验证集。其中,train_imageval_image 是训练集和验证集的图像路径列表,train_labelval_label 是训练集和验证集的标签列表。

接着,对于数据读取器 data_reader,它会读入一个批次的数据并将其转化为网络所需的格式。在这个过程中,我们需要完成数据的预处理工作,例如图像的归一化、裁剪、缩放等操作。

最后,我们使用 paddle.batch() 函数对训练、验证和测试数据进行划分和批处理。其中,batch_size 参数表示每个批次的样本数量,drop_last 参数表示是否丢弃最后一个不足一个批次的数据(如果为 True,则丢弃;如果为 False,则保留并进行处理)。函数最终返回一个批次数据的迭代器,可以使用这个迭代器逐一读取网络所需的训练数据、验证数据和测试数据。

复制代码
 feeder = fluid.DataFeeder(feed_list=[tra_images, label],place=place)

    
 all_train_iter=0
    
 all_train_iters=[]
    
 all_train_costs=[]
    
 all_train_accs=[]
    
  
    
  
    
  
    
 #定义数据读取
    
 data_shape = configs['input_size']
    
 train_image, train_label, val_image, val_label =get_files(configs['train_path'], 0.8)
    
 print(type(train_image[0]))
    
 train_reader = paddle.batch(data_reader(train_image,train_label),
    
                         batch_size=configs['batch_size'],
    
                         drop_last=True)
    
 eval_reader = paddle.batch(data_reader(val_image,val_label),
    
                         batch_size=configs['batch_size'],
    
                         drop_last=True)
    
 test_reader = paddle.batch(data_reader(val_image,val_label),
    
                         batch_size=1,
    
                         drop_last=True)
    
    
    
    

基于PaddlePaddle框架的深度学习模型训练。

首先,在每个训练轮次中,对训练数据进行遍历。通过运行主程序,将一个batch的数据喂入网络进行训练,然后获取训练损失和准确率。同时记录训练过程中的损失和准确率,并计算它们的均值。每训练10个batch后,打印出当前训练轮次、batch号、损失和准确率。

然后,在每个训练轮次结束后,对测试数据进行遍历。通过运行测试程序,将一个batch的数据喂入网络进行测试,然后获取测试损失和准确率。同时记录测试过程中的损失和准确率,并计算它们的均值。打印出当前训练轮次、测试损失和准确率。

最后,保存训练好的模型。

复制代码
 #克隆用于测试模型

    
 test_program = fluid.default_main_program().clone(for_test=True)
    
  
    
 for pass_id in range(configs['epoch']):
    
     # 开始训练
    
     train_costs = []
    
     train_accs = []
    
     for batch_id, data in enumerate(train_reader()):                        #遍历train_reader的迭代器,并为数据加上索引batch_id
    
     train_cost,train_acc = exe.run(program=fluid.default_main_program(),#运行主程序
    
                          feed=feeder.feed(data),                        #喂入一个batch的数据
    
                          fetch_list=[avg_cost, acc])                    #fetch均方误差和准确率
    
  
    
     
    
     all_train_iter=all_train_iter+configs['batch_size']
    
     all_train_iters.append(all_train_iter)
    
  
    
     train_costs.append(train_cost[0])
    
     train_accs.append(train_acc[0])
    
  
    
     train_cost = np.mean(train_costs)
    
     train_acc = np.mean(train_accs)
    
     #每10次batch打印一次训练、进行一次测试
    
     if batch_id % 10 == 0 and batch_id!= 0 :                                             
    
         print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' % 
    
         (pass_id, batch_id, train_cost, train_acc))
    
         
    
  
    
     # 开始测试
    
     test_costs = []                                                         #测试的损失值
    
     test_accs = []                                                          #测试的准确率
    
     for batch_id, data in enumerate(eval_reader()):
    
     test_cost, test_acc = exe.run(program=test_program,                 #执行测试程序
    
                                   feed=feeder.feed(data),               #喂入数据
    
                                   fetch_list=[avg_cost, acc])           #fetch 误差、准确率
    
     test_costs.append(test_cost[0])                                     #记录每个batch的误差
    
     test_accs.append(test_acc[0])                                       #记录每个batch的准确率
    
     
    
     # 求测试结果的平均值
    
     test_cost = (sum(test_costs) / len(test_costs))                         #计算误差平均值(误差和/误差的个数)
    
     test_acc = (sum(test_accs) / len(test_accs))                            #计算准确率平均值( 准确率的和/准确率的个数)
    
     print('Test:%d, Cost:%0.5f, ACC:%0.5f' % (pass_id, test_cost, test_acc))
    
     
    
 #保存模型
    
 # 如果保存路径不存在就创建
    
 if not os.path.exists(configs['model_save_dir']):
    
     os.makedirs(configs['model_save_dir'])
    
 print ('save models to %s' % (configs['model_save_dir']))
    
 fluid.io.save_inference_model(configs['model_save_dir'],
    
                           ['image'],
    
                           [predict],
    
                           exe)
    
 print('训练模型保存完成!')
    
    
    
    

使用 PaddlePaddle 框架进行图像分类预测,给定一组图像,该代码通过执行推理程序来得到预测结果,并根据预测结果在图像上显示对应的标签。其中,变量 label_list 定义了预测分类的标签,变量 val_image 存储了待预测图像的路径,变量 val_label 存储了待预测图像对应的标签。代码使用了循环来遍历每张图像,对于每张图像,首先从数据读取器(eval_reader)中获取数据,然后使用 exe.run 函数执行预测程序,并得到预测结果。最后,使用 matplotlib 库将预测结果显示在图像上。

复制代码
     label_list = ['新冠肺炎患者','正常人','病毒性肺炎患者']

    
     for j in range(len(val_image)):
    
  
    
     for batch_id, data in enumerate(eval_reader()):
    
         results = exe.run(program=test_program,                 #运行预测程序
    
                             feed=feeder.feed(data),  #喂入要预测的img
    
                             fetch_list=predict)          #得到推测结果
    
         #print(results)
    
         results_list = results[0].tolist()
    
         #print(results_list)
    
         for i in range(len(results_list)):
    
             plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文
    
             plt.rcParams['axes.unicode_minus'] = False
    
             img = plt.imread(val_image[i])
    
             plt.title('图片的类型是:'+str(val_label[i]))
    
             plt.imshow(img)
    
             plt.show()
    
  
    
             predict_result=label_list[np.argmax(results_list[i])]
    
             print("预测结果为: \n", predict_result)
    
             print('------------------------------------')
    
             if  i ==5: #根据自己想查看多少张,可以修改这个i的值
    
                 break
    
         break
    
     break
    
    
    
    

全部评论 (0)

还没有任何评论哟~